001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.File;
020import java.io.IOException;
021import java.sql.Connection;
022import java.sql.SQLException;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.Locale;
026import java.util.Set;
027import java.util.concurrent.ScheduledFuture;
028import java.util.concurrent.ScheduledThreadPoolExecutor;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.TimeUnit;
031
032import javax.sql.DataSource;
033
034import org.apache.activemq.ActiveMQMessageAudit;
035import org.apache.activemq.broker.BrokerService;
036import org.apache.activemq.broker.ConnectionContext;
037import org.apache.activemq.broker.Locker;
038import org.apache.activemq.broker.scheduler.JobSchedulerStore;
039import org.apache.activemq.command.ActiveMQDestination;
040import org.apache.activemq.command.ActiveMQQueue;
041import org.apache.activemq.command.ActiveMQTopic;
042import org.apache.activemq.command.Message;
043import org.apache.activemq.command.MessageAck;
044import org.apache.activemq.command.MessageId;
045import org.apache.activemq.command.ProducerId;
046import org.apache.activemq.openwire.OpenWireFormat;
047import org.apache.activemq.store.MessageStore;
048import org.apache.activemq.store.PersistenceAdapter;
049import org.apache.activemq.store.TopicMessageStore;
050import org.apache.activemq.store.TransactionStore;
051import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
052import org.apache.activemq.store.memory.MemoryTransactionStore;
053import org.apache.activemq.usage.SystemUsage;
054import org.apache.activemq.util.ByteSequence;
055import org.apache.activemq.util.FactoryFinder;
056import org.apache.activemq.util.IOExceptionSupport;
057import org.apache.activemq.util.LongSequenceGenerator;
058import org.apache.activemq.util.ServiceStopper;
059import org.apache.activemq.util.ThreadPoolUtils;
060import org.apache.activemq.wireformat.WireFormat;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * A {@link PersistenceAdapter} implementation using JDBC for persistence
066 * storage.
067 *
068 * This persistence adapter will correctly remember prepared XA transactions,
069 * but it will not keep track of local transaction commits so that operations
070 * performed against the Message store are done as a single uow.
071 *
072 * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
073 *
074 */
075public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter {
076
077    private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
078    private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
079        "META-INF/services/org/apache/activemq/store/jdbc/");
080    private static FactoryFinder lockFactoryFinder = new FactoryFinder(
081        "META-INF/services/org/apache/activemq/store/jdbc/lock/");
082
083    public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000;
084
085    private WireFormat wireFormat = new OpenWireFormat();
086    private Statements statements;
087    private JDBCAdapter adapter;
088    private final JdbcMemoryTransactionStore transactionStore = new JdbcMemoryTransactionStore(this);
089    private ScheduledFuture<?> cleanupTicket;
090    private int cleanupPeriod = 1000 * 60 * 5;
091    private boolean useExternalMessageReferences;
092    private boolean createTablesOnStartup = true;
093    private DataSource lockDataSource;
094    private int transactionIsolation;
095    private File directory;
096    private boolean changeAutoCommitAllowed = true;
097
098    protected int maxProducersToAudit=1024;
099    protected int maxAuditDepth=1000;
100    protected boolean enableAudit=false;
101    protected int auditRecoveryDepth = 1024;
102    protected ActiveMQMessageAudit audit;
103
104    protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
105    protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
106    protected final HashMap<ActiveMQDestination, MessageStore> storeCache = new HashMap<>();
107
108    {
109        setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD);
110    }
111
112    public JDBCPersistenceAdapter() {
113    }
114
115    public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
116        super(ds);
117        this.wireFormat = wireFormat;
118    }
119
120    @Override
121    public Set<ActiveMQDestination> getDestinations() {
122        TransactionContext c = null;
123        try {
124            c = getTransactionContext();
125            return getAdapter().doGetDestinations(c);
126        } catch (IOException e) {
127            return emptyDestinationSet();
128        } catch (SQLException e) {
129            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
130            return emptyDestinationSet();
131        } finally {
132            if (c != null) {
133                try {
134                    c.close();
135                } catch (Throwable e) {
136                }
137            }
138        }
139    }
140
141    @SuppressWarnings("unchecked")
142    private Set<ActiveMQDestination> emptyDestinationSet() {
143        return Collections.EMPTY_SET;
144    }
145
146    protected void createMessageAudit() {
147        if (enableAudit && audit == null) {
148            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
149            TransactionContext c = null;
150
151            try {
152                c = getTransactionContext();
153                getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
154                    @Override
155                    public void messageId(MessageId id) {
156                        audit.isDuplicate(id);
157                    }
158                });
159            } catch (Exception e) {
160                LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
161            } finally {
162                if (c != null) {
163                    try {
164                        c.close();
165                    } catch (Throwable e) {
166                    }
167                }
168            }
169        }
170    }
171
172    public void initSequenceIdGenerator() {
173        TransactionContext c = null;
174        try {
175            c = getTransactionContext();
176            getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
177                @Override
178                public void messageId(MessageId id) {
179                    audit.isDuplicate(id);
180                }
181            });
182        } catch (Exception e) {
183            LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
184        } finally {
185            if (c != null) {
186                try {
187                    c.close();
188                } catch (Throwable e) {
189                }
190            }
191        }
192    }
193
194    @Override
195    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
196        MessageStore rc = storeCache.get(destination);
197        if (rc == null) {
198            MessageStore store = transactionStore.proxy(new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit));
199            synchronized (storeCache) {
200                rc = storeCache.put(destination, store);
201                if (rc != null) {
202                    storeCache.put(destination, rc);
203                } else {
204                    rc = store;
205                }
206            }
207        }
208        return rc;
209    }
210
211    @Override
212    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
213        TopicMessageStore rc = (TopicMessageStore) storeCache.get(destination);
214        if (rc == null) {
215            TopicMessageStore store = transactionStore.proxy(new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit));
216            synchronized (storeCache) {
217                rc = (TopicMessageStore) storeCache.put(destination, store);
218                if (rc != null) {
219                    storeCache.put(destination, rc);
220                } else {
221                    rc = store;
222                }
223            }
224        }
225        return rc;
226    }
227
228    /**
229     * Cleanup method to remove any state associated with the given destination
230     * @param destination Destination to forget
231     */
232    @Override
233    public void removeQueueMessageStore(ActiveMQQueue destination) {
234        if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination)) {
235            try {
236                removeConsumerDestination(destination);
237            } catch (IOException ioe) {
238                LOG.error("Failed to remove consumer destination: " + destination, ioe);
239            }
240        }
241        storeCache.remove(destination);
242    }
243
244    private void removeConsumerDestination(ActiveMQQueue destination) throws IOException {
245        TransactionContext c = getTransactionContext();
246        try {
247            String id = destination.getQualifiedName();
248            getAdapter().doDeleteSubscription(c, destination, id, id);
249        } catch (SQLException e) {
250            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
251            throw IOExceptionSupport.create("Failed to remove consumer destination: " + destination, e);
252        } finally {
253            c.close();
254        }
255    }
256
257    /**
258     * Cleanup method to remove any state associated with the given destination
259     * No state retained.... nothing to do
260     *
261     * @param destination Destination to forget
262     */
263    @Override
264    public void removeTopicMessageStore(ActiveMQTopic destination) {
265        storeCache.remove(destination);
266    }
267
268    @Override
269    public TransactionStore createTransactionStore() throws IOException {
270        return this.transactionStore;
271    }
272
273    @Override
274    public long getLastMessageBrokerSequenceId() throws IOException {
275        TransactionContext c = getTransactionContext();
276        try {
277            long seq =  getAdapter().doGetLastMessageStoreSequenceId(c);
278            sequenceGenerator.setLastSequenceId(seq);
279            long brokerSeq = 0;
280            if (seq != 0) {
281                byte[] msg = getAdapter().doGetMessageById(c, seq);
282                if (msg != null) {
283                    Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
284                    brokerSeq = last.getMessageId().getBrokerSequenceId();
285                } else {
286                   LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
287                }
288            }
289            return brokerSeq;
290        } catch (SQLException e) {
291            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
292            throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
293        } finally {
294            c.close();
295        }
296    }
297
298    @Override
299    public long getLastProducerSequenceId(ProducerId id) throws IOException {
300        TransactionContext c = getTransactionContext();
301        try {
302            return getAdapter().doGetLastProducerSequenceId(c, id);
303        } catch (SQLException e) {
304            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
305            throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
306        } finally {
307            c.close();
308        }
309    }
310
311    @Override
312    public void allowIOResumption() {}
313
314    @Override
315    public void init() throws Exception {
316        getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
317
318        if (isCreateTablesOnStartup()) {
319            TransactionContext transactionContext = getTransactionContext();
320            transactionContext.getExclusiveConnection();
321            transactionContext.begin();
322            try {
323                try {
324                    getAdapter().doCreateTables(transactionContext);
325                } catch (SQLException e) {
326                    LOG.warn("Cannot create tables due to: " + e);
327                    JDBCPersistenceAdapter.log("Failure Details: ", e);
328                }
329            } finally {
330                transactionContext.commit();
331            }
332        }
333    }
334
335    @Override
336    public void doStart() throws Exception {
337
338        if( brokerService!=null ) {
339          wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
340        }
341
342        // Cleanup the db periodically.
343        if (cleanupPeriod > 0) {
344            cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
345                @Override
346                public void run() {
347                    cleanup();
348                }
349            }, 0, cleanupPeriod, TimeUnit.MILLISECONDS);
350        }
351        createMessageAudit();
352    }
353
354    @Override
355    public synchronized void doStop(ServiceStopper stopper) throws Exception {
356        if (cleanupTicket != null) {
357            cleanupTicket.cancel(true);
358            cleanupTicket = null;
359        }
360        closeDataSource(getDataSource());
361    }
362
363    public void cleanup() {
364        TransactionContext c = null;
365        try {
366            LOG.debug("Cleaning up old messages.");
367            c = getTransactionContext();
368            c.getExclusiveConnection();
369            getAdapter().doDeleteOldMessages(c);
370        } catch (IOException e) {
371            LOG.warn("Old message cleanup failed due to: " + e, e);
372        } catch (SQLException e) {
373            LOG.warn("Old message cleanup failed due to: " + e);
374            JDBCPersistenceAdapter.log("Failure Details: ", e);
375        } finally {
376            if (c != null) {
377                try {
378                    c.close();
379                } catch (Throwable e) {
380                }
381            }
382            LOG.debug("Cleanup done.");
383        }
384    }
385
386    @Override
387    public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
388        if (clockDaemon == null) {
389            clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
390                @Override
391                public Thread newThread(Runnable runnable) {
392                    Thread thread = new Thread(runnable, "ActiveMQ JDBC PA Scheduled Task");
393                    thread.setDaemon(true);
394                    return thread;
395                }
396            });
397        }
398        return clockDaemon;
399    }
400
401    public JDBCAdapter getAdapter() throws IOException {
402        if (adapter == null) {
403            setAdapter(createAdapter());
404        }
405        return adapter;
406    }
407
408    /**
409     * @deprecated as of 5.7.0, replaced by {@link #getLocker()}
410     */
411    @Deprecated
412    public Locker getDatabaseLocker() throws IOException {
413        return getLocker();
414    }
415
416    /**
417     * Sets the database locker strategy to use to lock the database on startup
418     * @throws IOException
419     *
420     * @deprecated as of 5.7.0, replaced by {@link #setLocker(org.apache.activemq.broker.Locker)}
421     */
422    @Deprecated
423    public void setDatabaseLocker(Locker locker) throws IOException {
424        setLocker(locker);
425    }
426
427    public DataSource getLockDataSource() throws IOException {
428        if (lockDataSource == null) {
429            lockDataSource = getDataSource();
430            if (lockDataSource == null) {
431                throw new IllegalArgumentException(
432                        "No dataSource property has been configured");
433            }
434        }
435        return lockDataSource;
436    }
437
438    public void setLockDataSource(DataSource dataSource) {
439        this.lockDataSource = dataSource;
440        LOG.info("Using a separate dataSource for locking: "
441                            + lockDataSource);
442    }
443
444    @Override
445    public BrokerService getBrokerService() {
446        return brokerService;
447    }
448
449    /**
450     * @throws IOException
451     */
452    protected JDBCAdapter createAdapter() throws IOException {
453
454        adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
455
456        // Use the default JDBC adapter if the
457        // Database type is not recognized.
458        if (adapter == null) {
459            adapter = new DefaultJDBCAdapter();
460            LOG.debug("Using default JDBC Adapter: " + adapter);
461        }
462        return adapter;
463    }
464
465    private Object loadAdapter(FactoryFinder finder, String kind) throws IOException {
466        Object adapter = null;
467        TransactionContext c = getTransactionContext();
468        try {
469            try {
470                // Make the filename file system safe.
471                String dirverName = c.getConnection().getMetaData().getDriverName();
472                dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(Locale.ENGLISH);
473
474                try {
475                    adapter = finder.newInstance(dirverName);
476                    LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass());
477                } catch (Throwable e) {
478                    LOG.info("Database " + kind + " driver override not found for : [" + dirverName
479                             + "].  Will use default implementation.");
480                }
481            } catch (SQLException e) {
482                LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: "
483                          + e.getMessage());
484                JDBCPersistenceAdapter.log("Failure Details: ", e);
485            }
486        } finally {
487            c.close();
488        }
489        return adapter;
490    }
491
492    public void setAdapter(JDBCAdapter adapter) {
493        this.adapter = adapter;
494        this.adapter.setStatements(getStatements());
495        this.adapter.setMaxRows(getMaxRows());
496    }
497
498    public WireFormat getWireFormat() {
499        return wireFormat;
500    }
501
502    public void setWireFormat(WireFormat wireFormat) {
503        this.wireFormat = wireFormat;
504    }
505
506    public TransactionContext getTransactionContext(ConnectionContext context) throws IOException {
507        if (context == null || isBrokerContext(context)) {
508            return getTransactionContext();
509        } else {
510            TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
511            if (answer == null) {
512                answer = getTransactionContext();
513                context.setLongTermStoreContext(answer);
514            }
515            return answer;
516        }
517    }
518
519    private boolean isBrokerContext(ConnectionContext context) {
520        return context.getSecurityContext() != null && context.getSecurityContext().isBrokerContext();
521    }
522
523    public TransactionContext getTransactionContext() throws IOException {
524        TransactionContext answer = new TransactionContext(this);
525        if (transactionIsolation > 0) {
526            answer.setTransactionIsolation(transactionIsolation);
527        }
528        return answer;
529    }
530
531    @Override
532    public void beginTransaction(ConnectionContext context) throws IOException {
533        TransactionContext transactionContext = getTransactionContext(context);
534        transactionContext.begin();
535    }
536
537    @Override
538    public void commitTransaction(ConnectionContext context) throws IOException {
539        TransactionContext transactionContext = getTransactionContext(context);
540        transactionContext.commit();
541    }
542
543    @Override
544    public void rollbackTransaction(ConnectionContext context) throws IOException {
545        TransactionContext transactionContext = getTransactionContext(context);
546        transactionContext.rollback();
547    }
548
549    public int getCleanupPeriod() {
550        return cleanupPeriod;
551    }
552
553    /**
554     * Sets the number of milliseconds until the database is attempted to be
555     * cleaned up for durable topics
556     */
557    public void setCleanupPeriod(int cleanupPeriod) {
558        this.cleanupPeriod = cleanupPeriod;
559    }
560
561    public boolean isChangeAutoCommitAllowed() {
562        return changeAutoCommitAllowed;
563    }
564
565    /**
566     * Whether the JDBC driver allows to set the auto commit.
567     * Some drivers does not allow changing the auto commit. The default value is true.
568     *
569     * @param changeAutoCommitAllowed true to change, false to not change.
570     */
571    public void setChangeAutoCommitAllowed(boolean changeAutoCommitAllowed) {
572        this.changeAutoCommitAllowed = changeAutoCommitAllowed;
573    }
574
575    @Override
576    public void deleteAllMessages() throws IOException {
577        TransactionContext c = getTransactionContext();
578        c.getExclusiveConnection();
579        try {
580            getAdapter().doDropTables(c);
581            getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
582            getAdapter().doCreateTables(c);
583            LOG.info("Persistence store purged.");
584        } catch (SQLException e) {
585            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
586            throw IOExceptionSupport.create(e);
587        } finally {
588            c.close();
589        }
590    }
591
592    public boolean isUseExternalMessageReferences() {
593        return useExternalMessageReferences;
594    }
595
596    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
597        this.useExternalMessageReferences = useExternalMessageReferences;
598    }
599
600    public boolean isCreateTablesOnStartup() {
601        return createTablesOnStartup;
602    }
603
604    /**
605     * Sets whether or not tables are created on startup
606     */
607    public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
608        this.createTablesOnStartup = createTablesOnStartup;
609    }
610
611    /**
612     * @deprecated use {@link #setUseLock(boolean)} instead
613     *
614     * Sets whether or not an exclusive database lock should be used to enable
615     * JDBC Master/Slave. Enabled by default.
616     */
617    @Deprecated
618    public void setUseDatabaseLock(boolean useDatabaseLock) {
619        setUseLock(useDatabaseLock);
620    }
621
622    public static void log(String msg, SQLException e) {
623        String s = msg + e.getMessage();
624        while (e.getNextException() != null) {
625            e = e.getNextException();
626            s += ", due to: " + e.getMessage();
627        }
628        LOG.warn(s, e);
629    }
630
631    public Statements getStatements() {
632        if (statements == null) {
633            statements = new Statements();
634        }
635        return statements;
636    }
637
638    public void setStatements(Statements statements) {
639        this.statements = statements;
640        if (adapter != null) {
641            this.adapter.setStatements(getStatements());
642        }
643    }
644
645    /**
646     * @param usageManager The UsageManager that is controlling the
647     *                destination's memory usage.
648     */
649    @Override
650    public void setUsageManager(SystemUsage usageManager) {
651    }
652
653    @Override
654    public Locker createDefaultLocker() throws IOException {
655        Locker locker = (Locker) loadAdapter(lockFactoryFinder, "lock");
656        if (locker == null) {
657            locker = new DefaultDatabaseLocker();
658            LOG.debug("Using default JDBC Locker: " + locker);
659        }
660        locker.configure(this);
661        return locker;
662    }
663
664    @Override
665    public void setBrokerName(String brokerName) {
666    }
667
668    @Override
669    public String toString() {
670        return "JDBCPersistenceAdapter(" + super.toString() + ")";
671    }
672
673    @Override
674    public void setDirectory(File dir) {
675        this.directory=dir;
676    }
677
678    @Override
679    public File getDirectory(){
680        if (this.directory==null && brokerService != null){
681            this.directory=brokerService.getBrokerDataDirectory();
682        }
683        return this.directory;
684    }
685
686    // interesting bit here is proof that DB is ok
687    @Override
688    public void checkpoint(boolean sync) throws IOException {
689        // by pass TransactionContext to avoid IO Exception handler
690        Connection connection = null;
691        try {
692            connection = getDataSource().getConnection();
693            if (!connection.isValid(10)) {
694                throw new IOException("isValid(10) failed for: " + connection);
695            }
696        } catch (SQLException e) {
697            LOG.debug("Could not get JDBC connection for checkpoint: " + e);
698            throw IOExceptionSupport.create(e);
699        } finally {
700            if (connection != null) {
701                try {
702                    connection.close();
703                } catch (Throwable ignored) {
704                }
705            }
706        }
707    }
708
709    @Override
710    public long size(){
711        return 0;
712    }
713
714    /**
715     * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
716     *
717     * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
718     * not applied if DataBaseLocker is injected.
719     *
720     */
721    @Deprecated
722    public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException {
723        getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval);
724    }
725
726    /**
727     * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
728     * This allowable dirty isolation level may not be achievable in clustered DB environments
729     * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
730     * see isolation level constants in {@link java.sql.Connection}
731     * @param transactionIsolation the isolation level to use
732     */
733    public void setTransactionIsolation(int transactionIsolation) {
734        this.transactionIsolation = transactionIsolation;
735    }
736
737    public int getMaxProducersToAudit() {
738        return maxProducersToAudit;
739    }
740
741    public void setMaxProducersToAudit(int maxProducersToAudit) {
742        this.maxProducersToAudit = maxProducersToAudit;
743    }
744
745    public int getMaxAuditDepth() {
746        return maxAuditDepth;
747    }
748
749    public void setMaxAuditDepth(int maxAuditDepth) {
750        this.maxAuditDepth = maxAuditDepth;
751    }
752
753    public boolean isEnableAudit() {
754        return enableAudit;
755    }
756
757    public void setEnableAudit(boolean enableAudit) {
758        this.enableAudit = enableAudit;
759    }
760
761    public int getAuditRecoveryDepth() {
762        return auditRecoveryDepth;
763    }
764
765    public void setAuditRecoveryDepth(int auditRecoveryDepth) {
766        this.auditRecoveryDepth = auditRecoveryDepth;
767    }
768
769    public long getNextSequenceId() {
770        return sequenceGenerator.getNextSequenceId();
771    }
772
773    public int getMaxRows() {
774        return maxRows;
775    }
776
777    /*
778     * the max rows return from queries, with sparse selectors this may need to be increased
779     */
780    public void setMaxRows(int maxRows) {
781        this.maxRows = maxRows;
782    }
783
784    public void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws IOException {
785        TransactionContext c = getTransactionContext();
786        try {
787            getAdapter().doRecoverPreparedOps(c, jdbcMemoryTransactionStore);
788        } catch (SQLException e) {
789            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
790            throw IOExceptionSupport.create("Failed to recover from: " + jdbcMemoryTransactionStore + ". Reason: " + e,e);
791        } finally {
792            c.close();
793        }
794    }
795
796    public void commitAdd(ConnectionContext context, final MessageId messageId, final long preparedSequenceId, final long newSequence) throws IOException {
797        TransactionContext c = getTransactionContext(context);
798        try {
799            getAdapter().doCommitAddOp(c, preparedSequenceId, newSequence);
800        } catch (SQLException e) {
801            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
802            throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e);
803        } finally {
804            c.close();
805        }
806    }
807
808    public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException {
809        TransactionContext c = getTransactionContext(context);
810        try {
811            getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getEntryLocator(), null);
812        } catch (SQLException e) {
813            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
814            throw IOExceptionSupport.create("Failed to commit last ack: " + ack + ". Reason: " + e,e);
815        } finally {
816            c.close();
817        }
818    }
819
820    public void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
821        TransactionContext c = getTransactionContext(context);
822        try {
823            getAdapter().doSetLastAck(c, destination, null, clientId, subName, xidLastAck, priority);
824        } catch (SQLException e) {
825            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
826            throw IOExceptionSupport.create("Failed to commit last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e,e);
827        } finally {
828            c.close();
829        }
830    }
831
832    public void rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId) throws IOException {
833        TransactionContext c = getTransactionContext(context);
834        try {
835            byte priority = (byte) store.getCachedStoreSequenceId(c, store.getDestination(), ack.getLastMessageId())[1];
836            getAdapter().doClearLastAck(c, store.getDestination(), priority, clientId, subName);
837        } catch (SQLException e) {
838            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
839            throw IOExceptionSupport.create("Failed to rollback last ack: " + ack + " on " +  store.getDestination() + " for " + subName + ":" + clientId + ". Reason: " + e,e);
840        } finally {
841            c.close();
842        }
843    }
844
845    // after recovery there is no record of the original messageId for the ack
846    public void rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
847        TransactionContext c = getTransactionContext(context);
848        try {
849            getAdapter().doClearLastAck(c, destination, priority, clientId, subName);
850        } catch (SQLException e) {
851            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
852            throw IOExceptionSupport.create("Failed to rollback last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e, e);
853        } finally {
854            c.close();
855        }
856    }
857
858    long[] getStoreSequenceIdForMessageId(ConnectionContext context, MessageId messageId, ActiveMQDestination destination) throws IOException {
859        long[] result = new long[]{-1, Byte.MAX_VALUE -1};
860        TransactionContext c = getTransactionContext(context);
861        try {
862            result = adapter.getStoreSequenceId(c, destination, messageId);
863        } catch (SQLException e) {
864            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
865            throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
866        } finally {
867            c.close();
868        }
869        return result;
870    }
871
872    @Override
873    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
874        throw new UnsupportedOperationException();
875    }
876}