001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc.adapter;
018
019import static javax.xml.bind.DatatypeConverter.parseBase64Binary;
020import static javax.xml.bind.DatatypeConverter.printBase64Binary;
021
022import java.io.IOException;
023import java.sql.Connection;
024import java.sql.PreparedStatement;
025import java.sql.ResultSet;
026import java.sql.SQLException;
027import java.sql.Statement;
028import java.util.ArrayList;
029import java.util.HashSet;
030import java.util.LinkedList;
031import java.util.Set;
032import java.util.concurrent.locks.ReadWriteLock;
033import java.util.concurrent.locks.ReentrantReadWriteLock;
034
035import org.apache.activemq.command.ActiveMQDestination;
036import org.apache.activemq.command.MessageId;
037import org.apache.activemq.command.ProducerId;
038import org.apache.activemq.command.SubscriptionInfo;
039import org.apache.activemq.command.XATransactionId;
040import org.apache.activemq.store.jdbc.JDBCAdapter;
041import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
042import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
043import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
044import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
045import org.apache.activemq.store.jdbc.Statements;
046import org.apache.activemq.store.jdbc.TransactionContext;
047import org.apache.activemq.util.DataByteArrayOutputStream;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
053 * encouraged to override the default implementation of methods to account for differences in JDBC Driver
054 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
055 * The databases/JDBC drivers that use this adapter are:
056 * <ul>
057 * <li></li>
058 * </ul>
059 * 
060 * @org.apache.xbean.XBean element="defaultJDBCAdapter"
061 * 
062 * 
063 */
064public class DefaultJDBCAdapter implements JDBCAdapter {
065    private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class);
066    public static final int MAX_ROWS = org.apache.activemq.ActiveMQPrefetchPolicy.MAX_PREFETCH_SIZE;
067    private static final String FAILURE_MESSAGE = "Failure was: %s Message: %s SQLState: %s Vendor code: %s";
068    protected Statements statements;
069    private boolean batchStatements = true;
070    //This is deprecated and should be removed in a future release
071    protected boolean batchStatments = true;
072    protected boolean prioritizedMessages;
073    protected int maxRows = MAX_ROWS;
074
075    protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
076        s.setBytes(index, data);
077    }
078
079    protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
080        return rs.getBytes(index);
081    }
082
083    @Override
084    public void doCreateTables(TransactionContext transactionContext) throws SQLException, IOException {
085        // Check to see if the table already exists. If it does, then don't log warnings during startup.
086        // Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version of the table
087        boolean messageTableAlreadyExists = messageTableAlreadyExists(transactionContext);
088
089        for (String createStatement : this.statements.getCreateSchemaStatements()) {
090            // This will fail usually since the tables will be
091            // created already.
092            executeStatement(transactionContext, createStatement, messageTableAlreadyExists);
093        }
094    }
095
096    private boolean messageTableAlreadyExists(TransactionContext transactionContext) {
097        boolean alreadyExists = false;
098        ResultSet rs = null;
099        try {
100            rs = transactionContext.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(), new String[] { "TABLE" });
101            alreadyExists = rs.next();
102        } catch (Throwable ignore) {
103        } finally {
104            close(rs);
105        }
106        return alreadyExists;
107    }
108
109    private void executeStatement(TransactionContext transactionContext, String createStatement, boolean ignoreStatementExecutionFailure) throws IOException {
110        Statement statement = null;
111        try {
112            LOG.debug("Executing SQL: " + createStatement);
113            statement = transactionContext.getConnection().createStatement();
114            statement.execute(createStatement);
115
116            commitIfAutoCommitIsDisabled(transactionContext);
117        } catch (SQLException e) {
118            if (ignoreStatementExecutionFailure) {
119                LOG.debug("Could not create JDBC tables; The message table already existed. " + String.format(FAILURE_MESSAGE, createStatement, e.getMessage(), e.getSQLState(), e.getErrorCode()));
120            } else {
121                LOG.warn("Could not create JDBC tables; they could already exist. " + String.format(FAILURE_MESSAGE, createStatement, e.getMessage(), e.getSQLState(), e.getErrorCode()));
122                JDBCPersistenceAdapter.log("Failure details: ", e);
123            }
124        } finally {
125            closeStatement(statement);
126        }
127    }
128
129    private void closeStatement(Statement statement) {
130        try {
131            if (statement != null) {
132                statement.close();
133            }
134        } catch (SQLException ignored) {}
135    }
136
137    private void commitIfAutoCommitIsDisabled(TransactionContext c) throws SQLException, IOException {
138        if (!c.getConnection().getAutoCommit()) {
139            c.getConnection().commit();
140        }
141    }
142
143    public void doDropTables(TransactionContext c) throws SQLException, IOException {
144        Statement s = null;
145        try {
146            s = c.getConnection().createStatement();
147            String[] dropStatments = this.statements.getDropSchemaStatements();
148            for (int i = 0; i < dropStatments.length; i++) {
149                // This will fail usually since the tables will be
150                // created already.
151                try {
152                    LOG.debug("Executing SQL: " + dropStatments[i]);
153                    s.execute(dropStatments[i]);
154                } catch (SQLException e) {
155                    LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
156                            + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
157                            + e.getErrorCode());
158                    JDBCPersistenceAdapter.log("Failure details: ", e);
159                }
160            }
161            commitIfAutoCommitIsDisabled(c);
162        } finally {
163            try {
164                s.close();
165            } catch (Throwable e) {
166            }
167        }
168    }
169
170    public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
171        PreparedStatement s = null;
172        ResultSet rs = null;
173        try {
174            s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
175            rs = s.executeQuery();
176            long seq1 = 0;
177            if (rs.next()) {
178                seq1 = rs.getLong(1);
179            }
180            rs.close();
181            s.close();
182            s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
183            rs = s.executeQuery();
184            long seq2 = 0;
185            if (rs.next()) {
186                seq2 = rs.getLong(1);
187            }
188            long seq = Math.max(seq1, seq2);
189            return seq;
190        } finally {
191            close(rs);
192            close(s);
193        }
194    }
195    
196    public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
197        PreparedStatement s = null;
198        ResultSet rs = null;
199        try {
200            s = c.getConnection().prepareStatement(
201                    this.statements.getFindMessageByIdStatement());
202            s.setLong(1, storeSequenceId);
203            rs = s.executeQuery();
204            if (!rs.next()) {
205                return null;
206            }
207            return getBinaryData(rs, 1);
208        } finally {
209            close(rs);
210            close(s);
211        }
212    }
213
214
215    /**
216     * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
217     */
218    public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
219                             long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
220        PreparedStatement s = c.getAddMessageStatement();
221        try {
222            if (s == null) {
223                s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
224                if (this.batchStatements) {
225                    c.setAddMessageStatement(s);
226                }
227            }
228            s.setLong(1, sequence);
229            s.setString(2, messageID.getProducerId().toString());
230            s.setLong(3, messageID.getProducerSequenceId());
231            s.setString(4, destination.getQualifiedName());
232            s.setLong(5, expiration);
233            s.setLong(6, priority);
234            setBinaryData(s, 7, data);
235            if (xid != null) {
236                byte[] xidVal = xid.getEncodedXidBytes();
237                xidVal[0] = '+';
238                String xidString = printBase64Binary(xidVal);
239                s.setString(8, xidString);
240            } else {
241                s.setString(8, null);
242            }
243            if (this.batchStatements) {
244                s.addBatch();
245            } else if (s.executeUpdate() != 1) {
246                throw new SQLException("Failed add a message");
247            }
248        } finally {
249            if (!this.batchStatements) {
250                if (s != null) {
251                    s.close();
252                }
253            }
254        }
255    }
256
257    @Override
258    public void doUpdateMessage(TransactionContext c, ActiveMQDestination destination, MessageId id, byte[] data) throws SQLException, IOException {
259        PreparedStatement s = null;
260        try {
261            s = c.getConnection().prepareStatement(this.statements.getUpdateMessageStatement());
262            setBinaryData(s, 1, data);
263            s.setString(2, id.getProducerId().toString());
264            s.setLong(3, id.getProducerSequenceId());
265            s.setString(4, destination.getQualifiedName());
266            if (s.executeUpdate() != 1) {
267                throw new IOException("Could not update message: " + id + " in " + destination);
268            }
269        } finally {
270            close(s);
271        }
272    }
273
274
275    public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
276            long expirationTime, String messageRef) throws SQLException, IOException {
277        PreparedStatement s = c.getAddMessageStatement();
278        try {
279            if (s == null) {
280                s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
281                if (this.batchStatements) {
282                    c.setAddMessageStatement(s);
283                }
284            }
285            s.setLong(1, messageID.getBrokerSequenceId());
286            s.setString(2, messageID.getProducerId().toString());
287            s.setLong(3, messageID.getProducerSequenceId());
288            s.setString(4, destination.getQualifiedName());
289            s.setLong(5, expirationTime);
290            s.setString(6, messageRef);
291            if (this.batchStatements) {
292                s.addBatch();
293            } else if (s.executeUpdate() != 1) {
294                throw new SQLException("Failed add a message");
295            }
296        } finally {
297            if (!this.batchStatements) {
298                s.close();
299            }
300        }
301    }
302
303    public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
304        PreparedStatement s = null;
305        ResultSet rs = null;
306        try {
307            s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
308            s.setString(1, messageID.getProducerId().toString());
309            s.setLong(2, messageID.getProducerSequenceId());
310            s.setString(3, destination.getQualifiedName());
311            rs = s.executeQuery();
312            if (!rs.next()) {
313                return new long[]{0,0};
314            }
315            return new long[]{rs.getLong(1), rs.getLong(2)};
316        } finally {
317            close(rs);
318            close(s);
319        }
320    }
321
322    public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
323        PreparedStatement s = null;
324        ResultSet rs = null;
325        try {
326            s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
327            s.setString(1, id.getProducerId().toString());
328            s.setLong(2, id.getProducerSequenceId());
329            rs = s.executeQuery();
330            if (!rs.next()) {
331                return null;
332            }
333            return getBinaryData(rs, 1);
334        } finally {
335            close(rs);
336            close(s);
337        }
338    }
339
340    public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
341        PreparedStatement s = null;
342        ResultSet rs = null;
343        try {
344            s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
345            s.setLong(1, seq);
346            rs = s.executeQuery();
347            if (!rs.next()) {
348                return null;
349            }
350            return rs.getString(1);
351        } finally {
352            close(rs);
353            close(s);
354        }
355    }
356
357    /**
358     * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
359     */
360    public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException {
361        PreparedStatement s = c.getRemovedMessageStatement();
362        try {
363            if (s == null) {
364                s = c.getConnection().prepareStatement(xid == null ?
365                        this.statements.getRemoveMessageStatement() : this.statements.getUpdateXidFlagStatement());
366                if (this.batchStatements) {
367                    c.setRemovedMessageStatement(s);
368                }
369            }
370            if (xid == null) {
371                s.setLong(1, seq);
372            } else {
373                byte[] xidVal = xid.getEncodedXidBytes();
374                xidVal[0] = '-';
375                String xidString = printBase64Binary(xidVal);
376                s.setString(1, xidString);
377                s.setLong(2, seq);
378            }
379            if (this.batchStatements) {
380                s.addBatch();
381            } else if (s.executeUpdate() != 1) {
382                throw new SQLException("Failed to remove message seq: " + seq);
383            }
384        } finally {
385            if (!this.batchStatements && s != null) {
386                s.close();
387            }
388        }
389    }
390
391    public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
392            throws Exception {
393        PreparedStatement s = null;
394        ResultSet rs = null;
395        try {
396            s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
397            s.setString(1, destination.getQualifiedName());
398            rs = s.executeQuery();
399            if (this.statements.isUseExternalMessageReferences()) {
400                while (rs.next()) {
401                    if (!listener.recoverMessageReference(rs.getString(2))) {
402                        break;
403                    }
404                }
405            } else {
406                while (rs.next()) {
407                    if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
408                        break;
409                    }
410                }
411            }
412        } finally {
413            close(rs);
414            close(s);
415        }
416    }
417
418    public void doMessageIdScan(TransactionContext c, int limit, 
419            JDBCMessageIdScanListener listener) throws SQLException, IOException {
420        PreparedStatement s = null;
421        ResultSet rs = null;
422        try {
423            s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
424            s.setMaxRows(limit);
425            rs = s.executeQuery();
426            // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
427            LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
428            while (rs.next()) {
429                reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
430            }
431            if (LOG.isDebugEnabled()) {
432                LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
433            }
434            for (MessageId id : reverseOrderIds) {
435                listener.messageId(id);
436            }
437        } finally {
438            close(rs);
439            close(s);
440        }
441    }
442    
443    public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
444                                         String subscriptionName, long seq, long priority) throws SQLException, IOException {
445        PreparedStatement s = c.getUpdateLastAckStatement();
446        try {
447            if (s == null) {
448                s = c.getConnection().prepareStatement(xid == null ?
449                        this.statements.getUpdateDurableLastAckWithPriorityStatement() :
450                        this.statements.getUpdateDurableLastAckWithPriorityInTxStatement());
451                if (this.batchStatements) {
452                    c.setUpdateLastAckStatement(s);
453                }
454            }
455            if (xid != null) {
456                byte[] xidVal = encodeXid(xid, seq, priority);
457                String xidString = printBase64Binary(xidVal);
458                s.setString(1, xidString);
459            } else {
460                s.setLong(1, seq);
461            }
462            s.setString(2, destination.getQualifiedName());
463            s.setString(3, clientId);
464            s.setString(4, subscriptionName);
465            s.setLong(5, priority);
466            if (this.batchStatements) {
467                s.addBatch();
468            } else if (s.executeUpdate() != 1) {
469                throw new SQLException("Failed update last ack with priority: " + priority + ", for sub: " + subscriptionName);
470            }
471        } finally {
472            if (!this.batchStatements) {
473                close(s);
474            }
475        }
476    }
477
478
479    public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
480                             String subscriptionName, long seq, long priority) throws SQLException, IOException {
481        PreparedStatement s = c.getUpdateLastAckStatement();
482        try {
483            if (s == null) {
484                s = c.getConnection().prepareStatement(xid == null ?
485                        this.statements.getUpdateDurableLastAckStatement() :
486                        this.statements.getUpdateDurableLastAckInTxStatement());
487                if (this.batchStatements) {
488                    c.setUpdateLastAckStatement(s);
489                }
490            }
491            if (xid != null) {
492                byte[] xidVal = encodeXid(xid, seq, priority);
493                String xidString = printBase64Binary(xidVal);
494                s.setString(1, xidString);
495            } else {
496                s.setLong(1, seq);
497            }
498            s.setString(2, destination.getQualifiedName());
499            s.setString(3, clientId);
500            s.setString(4, subscriptionName);
501
502            if (this.batchStatements) {
503                s.addBatch();
504            } else if (s.executeUpdate() != 1) {
505                throw new IOException("Could not update last ack seq : "
506                            + seq + ", for sub: " + subscriptionName);
507            }
508        } finally {
509            if (!this.batchStatements) {
510                close(s);
511            }            
512        }
513    }
514
515    private byte[] encodeXid(XATransactionId xid, long seq, long priority) {
516        byte[] xidVal = xid.getEncodedXidBytes();
517        // encode the update
518        DataByteArrayOutputStream outputStream = xid.internalOutputStream();
519        outputStream.position(1);
520        outputStream.writeLong(seq);
521        outputStream.writeByte(Long.valueOf(priority).byteValue());
522        return xidVal;
523    }
524
525    @Override
526    public void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String clientId, String subName) throws SQLException, IOException {
527        PreparedStatement s = null;
528        try {
529            s = c.getConnection().prepareStatement(this.statements.getClearDurableLastAckInTxStatement());
530            s.setString(1, destination.getQualifiedName());
531            s.setString(2, clientId);
532            s.setString(3, subName);
533            s.setLong(4, priority);
534            if (s.executeUpdate() != 1) {
535                throw new IOException("Could not remove prepared transaction state from message ack for: " + clientId + ":" + subName);
536            }
537        } finally {
538            close(s);
539        }
540    }
541
542    public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
543            String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
544        // dumpTables(c,
545        // destination.getQualifiedName(),clientId,subscriptionName);
546        PreparedStatement s = null;
547        ResultSet rs = null;
548        try {
549            s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
550            s.setString(1, destination.getQualifiedName());
551            s.setString(2, clientId);
552            s.setString(3, subscriptionName);
553            rs = s.executeQuery();
554            if (this.statements.isUseExternalMessageReferences()) {
555                while (rs.next()) {
556                    if (!listener.recoverMessageReference(rs.getString(2))) {
557                        break;
558                    }
559                }
560            } else {
561                while (rs.next()) {
562                    if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
563                        break;
564                    }
565                }
566            }
567        } finally {
568            close(rs);
569            close(s);
570        }
571    }
572
573    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
574            String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
575        
576        PreparedStatement s = null;
577        ResultSet rs = null;
578        try {
579            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
580            s.setMaxRows(Math.min(maxReturned * 2, maxRows));
581            s.setString(1, destination.getQualifiedName());
582            s.setString(2, clientId);
583            s.setString(3, subscriptionName);
584            s.setLong(4, seq);
585            rs = s.executeQuery();
586            int count = 0;
587            if (this.statements.isUseExternalMessageReferences()) {
588                while (rs.next() && count < maxReturned) {
589                    if (listener.recoverMessageReference(rs.getString(1))) {
590                        count++;
591                    }
592                }
593            } else {
594                while (rs.next() && count < maxReturned) {
595                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
596                        count++;
597                    }
598                }
599            }
600        } finally {
601            close(rs);
602            close(s);
603        }
604    }
605
606    public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
607            String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
608
609        PreparedStatement s = null;
610        ResultSet rs = null;
611        try {
612            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
613            s.setMaxRows(Math.min(maxReturned * 2, maxRows));
614            s.setString(1, destination.getQualifiedName());
615            s.setString(2, clientId);
616            s.setString(3, subscriptionName);
617            s.setLong(4, seq);
618            s.setLong(5, priority);
619            rs = s.executeQuery();
620            int count = 0;
621            if (this.statements.isUseExternalMessageReferences()) {
622                while (rs.next() && count < maxReturned) {
623                    if (listener.recoverMessageReference(rs.getString(1))) {
624                        count++;
625                    }
626                }
627            } else {
628                while (rs.next() && count < maxReturned) {
629                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
630                        count++;
631                    }
632                }
633            }
634        } finally {
635            close(rs);
636            close(s);
637        }
638    }
639
640    public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
641            String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
642        PreparedStatement s = null;
643        ResultSet rs = null;
644        int result = 0;
645        try {
646            if (isPrioritizedMessages) {
647                s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
648            } else {
649                s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());    
650            }
651            s.setString(1, destination.getQualifiedName());
652            s.setString(2, clientId);
653            s.setString(3, subscriptionName);
654            rs = s.executeQuery();
655            if (rs.next()) {
656                result = rs.getInt(1);
657            }
658        } finally {
659            close(rs);
660            close(s);
661        }
662        return result;
663    }
664
665    /**
666     * @param c 
667     * @param info 
668     * @param retroactive 
669     * @throws SQLException 
670     * @throws IOException 
671     */
672    public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
673            throws SQLException, IOException {
674        // dumpTables(c, destination.getQualifiedName(), clientId,
675        // subscriptionName);
676        PreparedStatement s = null;
677        try {
678            long lastMessageId = -1;
679            if (!retroactive) {
680                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
681                ResultSet rs = null;
682                try {
683                    rs = s.executeQuery();
684                    if (rs.next()) {
685                        lastMessageId = rs.getLong(1);
686                    }
687                } finally {
688                    close(rs);
689                    close(s);
690                }
691            }
692            s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
693            int maxPriority = 1;
694            if (isPrioritizedMessages) {
695                maxPriority = 10;
696            }
697
698            for (int priority = 0; priority < maxPriority; priority++) {
699                s.setString(1, info.getDestination().getQualifiedName());
700                s.setString(2, info.getClientId());
701                s.setString(3, info.getSubscriptionName());
702                s.setString(4, info.getSelector());
703                s.setLong(5, lastMessageId);
704                s.setString(6, info.getSubscribedDestination().getQualifiedName());
705                s.setLong(7, priority);
706
707                if (s.executeUpdate() != 1) {
708                    throw new IOException("Could not create durable subscription for: " + info.getClientId());
709                }
710            }
711
712        } finally {
713            close(s);
714        }
715    }
716
717    public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
718            String clientId, String subscriptionName) throws SQLException, IOException {
719        PreparedStatement s = null;
720        ResultSet rs = null;
721        try {
722            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
723            s.setString(1, destination.getQualifiedName());
724            s.setString(2, clientId);
725            s.setString(3, subscriptionName);
726            rs = s.executeQuery();
727            if (!rs.next()) {
728                return null;
729            }
730            SubscriptionInfo subscription = new SubscriptionInfo();
731            subscription.setDestination(destination);
732            subscription.setClientId(clientId);
733            subscription.setSubscriptionName(subscriptionName);
734            subscription.setSelector(rs.getString(1));
735            subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
736                    ActiveMQDestination.QUEUE_TYPE));
737            return subscription;
738        } finally {
739            close(rs);
740            close(s);
741        }
742    }
743
744    public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
745            throws SQLException, IOException {
746        PreparedStatement s = null;
747        ResultSet rs = null;
748        try {
749            s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
750            s.setString(1, destination.getQualifiedName());
751            rs = s.executeQuery();
752            ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
753            while (rs.next()) {
754                SubscriptionInfo subscription = new SubscriptionInfo();
755                subscription.setDestination(destination);
756                subscription.setSelector(rs.getString(1));
757                subscription.setSubscriptionName(rs.getString(2));
758                subscription.setClientId(rs.getString(3));
759                subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
760                        ActiveMQDestination.QUEUE_TYPE));
761                rc.add(subscription);
762            }
763            return rc.toArray(new SubscriptionInfo[rc.size()]);
764        } finally {
765            close(rs);
766            close(s);
767        }
768    }
769
770    public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
771            IOException {
772        PreparedStatement s = null;
773        try {
774            s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
775            s.setString(1, destinationName.getQualifiedName());
776            s.executeUpdate();
777            s.close();
778            s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
779            s.setString(1, destinationName.getQualifiedName());
780            s.executeUpdate();
781        } finally {
782            close(s);
783        }
784    }
785
786    public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
787            String subscriptionName) throws SQLException, IOException {
788        PreparedStatement s = null;
789        try {
790            s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
791            s.setString(1, destination.getQualifiedName());
792            s.setString(2, clientId);
793            s.setString(3, subscriptionName);
794            s.executeUpdate();
795        } finally {
796            close(s);
797        }
798    }
799
800    char priorityIterator = 0; // unsigned
801    public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
802        PreparedStatement s = null;
803        try {
804            LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
805            s = c.getExclusiveConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
806            int priority = priorityIterator++%10;
807            s.setInt(1, priority);
808            s.setInt(2, priority);
809            int i = s.executeUpdate();
810            LOG.debug("Deleted " + i + " old message(s) at priority: " + priority);
811        } finally {
812            close(s);
813        }
814    }
815
816    public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
817            String clientId, String subscriberName) throws SQLException, IOException {
818        PreparedStatement s = null;
819        ResultSet rs = null;
820        long result = -1;
821        try {
822            s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
823            s.setString(1, destination.getQualifiedName());
824            s.setString(2, clientId);
825            s.setString(3, subscriberName);
826            rs = s.executeQuery();
827            if (rs.next()) {
828                result = rs.getLong(1);
829                if (result == 0 && rs.wasNull()) {
830                    result = -1;
831                }
832            }
833        } finally {
834            close(rs);
835            close(s);
836        }
837        return result;
838    }
839
840    protected static void close(PreparedStatement s) {
841        try {
842            s.close();
843        } catch (Throwable e) {
844        }
845    }
846
847    protected static void close(ResultSet rs) {
848        try {
849            rs.close();
850        } catch (Throwable e) {
851        }
852    }
853
854    public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
855        HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
856        PreparedStatement s = null;
857        ResultSet rs = null;
858        try {
859            s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
860            rs = s.executeQuery();
861            while (rs.next()) {
862                rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
863            }
864        } finally {
865            close(rs);
866            close(s);
867        }
868        return rc;
869    }
870
871    /**
872     * @return true if batchStatements
873     */
874    public boolean isBatchStatements() {
875        return batchStatements;
876    }
877
878    /**
879     * Set the number of statements to process as a single batch DB update
880     * @param batchStatements
881     */
882    public void setBatchStatements(boolean batchStatements) {
883        this.batchStatements = batchStatements;
884        // The next lines are deprecated and should be removed in a future release
885        // and is here in case someone created their own
886       // this.batchStatments = batchStatements;
887    }
888
889    // Note - remove batchStatment in future distributions.  Here for backward compatibility
890    /**
891     * @return true if batchStements
892     */
893    public boolean isBatchStatments() {
894        return this.batchStatements;
895    }
896
897    /**
898     * This value batchStatments is deprecated and will be removed in a future release.  Use batchStatements instead (Note the 'e' in Statement)"
899     * @deprecated
900     * @param batchStatments
901     */
902    public void setBatchStatments(boolean batchStatments) {
903        LOG.warn("batchStatments is deprecated and will be removed in a future release.  Use batchStatements instead (Note the 'e' in Statement)");
904        this.batchStatements = batchStatments;
905        this.batchStatments = batchStatments;
906    }
907
908    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
909        this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
910    }
911
912    /**
913     * @return the statements
914     */
915    public Statements getStatements() {
916        return this.statements;
917    }
918
919    public void setStatements(Statements statements) {
920        this.statements = statements;
921    }
922
923    public int getMaxRows() {
924        return maxRows;
925    }
926
927    /**
928     * the max value for statement maxRows, used to limit jdbc queries
929     */
930    public void setMaxRows(int maxRows) {
931        this.maxRows = maxRows;
932    }
933
934    @Override
935    public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException {
936        PreparedStatement s = null;
937        try {
938            s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
939            s.setString(1, destination.getQualifiedName());
940            s.setString(2, destination.getQualifiedName());
941            s.setString(3, destination.getQualifiedName());
942            s.setString(4, null);
943            s.setLong(5, 0);
944            s.setString(6, destination.getQualifiedName());
945            s.setLong(7, 11);  // entry out of priority range
946
947            if (s.executeUpdate() != 1) {
948                throw new IOException("Could not create ack record for destination: " + destination);
949            }
950        } finally {
951            close(s);
952        }
953    }
954
955    @Override
956    public void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException {
957        PreparedStatement s = null;
958        ResultSet rs = null;
959        try {
960            s = c.getConnection().prepareStatement(this.statements.getFindOpsPendingOutcomeStatement());
961            rs = s.executeQuery();
962            while (rs.next()) {
963                long id = rs.getLong(1);
964                String encodedString = rs.getString(2);
965                byte[] encodedXid = parseBase64Binary(encodedString);
966                if (encodedXid[0] == '+') {
967                    jdbcMemoryTransactionStore.recoverAdd(id, getBinaryData(rs, 3));
968                } else {
969                    jdbcMemoryTransactionStore.recoverAck(id, encodedXid, getBinaryData(rs, 3));
970                }
971            }
972
973            close(rs);
974            close(s);
975
976            s = c.getConnection().prepareStatement(this.statements.getFindAcksPendingOutcomeStatement());
977            rs = s.executeQuery();
978            while (rs.next()) {
979                String encodedString = rs.getString(1);
980                byte[] encodedXid = parseBase64Binary(encodedString);
981                String destination = rs.getString(2);
982                String subName = rs.getString(3);
983                String subId = rs.getString(4);
984                jdbcMemoryTransactionStore.recoverLastAck(encodedXid,
985                        ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE),
986                        subName, subId);
987            }
988        } finally {
989            close(rs);
990            close(s);
991        }
992    }
993
994    @Override
995    public void doCommitAddOp(TransactionContext c, long preparedSequence, long sequence) throws SQLException, IOException {
996        PreparedStatement s = null;
997        try {
998            s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement());
999            s.setLong(1, sequence);
1000            s.setLong(2, preparedSequence);
1001            if (s.executeUpdate() != 1) {
1002                throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence);
1003            }
1004        } finally {
1005            close(s);
1006        }
1007    }
1008
1009
1010    public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
1011            IOException {
1012        PreparedStatement s = null;
1013        ResultSet rs = null;
1014        int result = 0;
1015        try {
1016            s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
1017            s.setString(1, destination.getQualifiedName());
1018            rs = s.executeQuery();
1019            if (rs.next()) {
1020                result = rs.getInt(1);
1021            }
1022        } finally {
1023            close(rs);
1024            close(s);
1025        }
1026        return result;
1027    }
1028
1029    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long[] lastRecoveredEntries,
1030            long maxSeq, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
1031        PreparedStatement s = null;
1032        ResultSet rs = null;
1033        try {
1034            if (isPrioritizedMessages) {
1035                s = c.getConnection().prepareStatement(limitQuery(this.statements.getFindNextMessagesByPriorityStatement()));
1036            } else {
1037                s = c.getConnection().prepareStatement(limitQuery(this.statements.getFindNextMessagesStatement()));
1038            }
1039            s.setMaxRows(Math.min(maxReturned, maxRows));
1040            s.setString(1, destination.getQualifiedName());
1041            s.setLong(2, maxSeq);
1042            int paramId = 3;
1043            if (isPrioritizedMessages) {
1044                for (int i=9;i>=0;i--) {
1045                    s.setLong(paramId++, lastRecoveredEntries[i]);
1046                }
1047            } else {
1048                s.setLong(paramId, lastRecoveredEntries[0]);
1049            }
1050            rs = s.executeQuery();
1051            int count = 0;
1052            if (this.statements.isUseExternalMessageReferences()) {
1053                while (rs.next() && count < maxReturned) {
1054                    if (listener.recoverMessageReference(rs.getString(1))) {
1055                        count++;
1056                    } else {
1057                        LOG.debug("Stopped recover next messages");
1058                        break;
1059                    }
1060                }
1061            } else {
1062                while (rs.next() && count < maxReturned) {
1063                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
1064                        count++;
1065                    } else {
1066                        LOG.debug("Stopped recover next messages");
1067                        break;
1068                    }
1069                }
1070            }
1071        } catch (Exception e) {
1072            e.printStackTrace();
1073        } finally {
1074            close(rs);
1075            close(s);
1076        }
1077    }
1078
1079    public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
1080            throws SQLException, IOException {
1081        PreparedStatement s = null;
1082        ResultSet rs = null;
1083        try {
1084            s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
1085            s.setString(1, id.toString());
1086            rs = s.executeQuery();
1087            long seq = -1;
1088            if (rs.next()) {
1089                seq = rs.getLong(1);
1090            }
1091            return seq;
1092        } finally {
1093            close(rs);
1094            close(s);
1095        }
1096    }
1097
1098    public static void dumpTables(Connection c, String destinationName, String clientId, String
1099      subscriptionName) throws SQLException { 
1100        printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 
1101        printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 
1102        PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM " 
1103                + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " 
1104                + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 
1105                + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" 
1106                + " ORDER BY M.ID");
1107      s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
1108      printQuery(s,System.out); }
1109
1110    public static void dumpTables(java.sql.Connection c) throws SQLException {
1111        printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_MSGS", System.out);
1112
1113        //printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_ACKS", System.out);
1114
1115        //printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out);
1116        //printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
1117    }
1118
1119    public static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)
1120            throws SQLException {
1121        printQuery(c.prepareStatement(query), out);
1122    }
1123
1124    public static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out)
1125            throws SQLException {
1126
1127        ResultSet set = null;
1128        try {
1129            set = s.executeQuery();
1130            java.sql.ResultSetMetaData metaData = set.getMetaData();
1131            for (int i = 1; i <= metaData.getColumnCount(); i++) {
1132                if (i == 1)
1133                    out.print("||");
1134                out.print(metaData.getColumnName(i) + "||");
1135            }
1136            out.println();
1137            while (set.next()) {
1138                for (int i = 1; i <= metaData.getColumnCount(); i++) {
1139                    if (i == 1)
1140                        out.print("|");
1141                    out.print(set.getString(i) + "|");
1142                }
1143                out.println();
1144            }
1145        } finally {
1146            try {
1147                set.close();
1148            } catch (Throwable ignore) {
1149            }
1150            try {
1151                s.close();
1152            } catch (Throwable ignore) {
1153            }
1154        }
1155    }
1156
1157    @Override
1158    public String limitQuery(String query) {
1159        return query;
1160    }
1161}