001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc.adapter;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.sql.Blob;
022import java.sql.Connection;
023import java.sql.PreparedStatement;
024import java.sql.ResultSet;
025import java.sql.SQLException;
026
027import org.apache.activemq.command.ActiveMQDestination;
028import org.apache.activemq.command.MessageId;
029import org.apache.activemq.command.XATransactionId;
030import org.apache.activemq.store.jdbc.Statements;
031import org.apache.activemq.store.jdbc.TransactionContext;
032import org.apache.activemq.util.ByteArrayOutputStream;
033
034/**
035 * This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob()
036 * operations. This is a little more involved since to insert a blob you have
037 * to:
038 *
039 * 1: insert empty blob. 2: select the blob 3: finally update the blob with data
040 * value.
041 *
042 * The databases/JDBC drivers that use this adapter are:
043 * <ul>
044 * <li></li>
045 * </ul>
046 *
047 * @org.apache.xbean.XBean element="blobJDBCAdapter"
048 *
049 *
050 */
051public class BlobJDBCAdapter extends DefaultJDBCAdapter {
052
053    @Override
054    public void setStatements(Statements statements) {
055
056        String addMessageStatement = "INSERT INTO "
057            + statements.getFullMessageTableName()
058            + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG, XID) VALUES (?, ?, ?, ?, ?, ?, empty_blob(), empty_blob())";
059        statements.setAddMessageStatement(addMessageStatement);
060
061        String findMessageByIdStatement = "SELECT MSG FROM " +
062                statements.getFullMessageTableName() + " WHERE ID=? FOR UPDATE";
063        statements.setFindMessageByIdStatement(findMessageByIdStatement);
064
065        super.setStatements(statements);
066    }
067
068    @Override
069    public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
070                             long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
071        PreparedStatement s = null;
072        try {
073            // Add the Blob record.
074            s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
075            s.setLong(1, sequence);
076            s.setString(2, messageID.getProducerId().toString());
077            s.setLong(3, messageID.getProducerSequenceId());
078            s.setString(4, destination.getQualifiedName());
079            s.setLong(5, expiration);
080            s.setLong(6, priority);
081
082            if (s.executeUpdate() != 1) {
083                throw new IOException("Failed to add broker message: " + messageID + " in container.");
084            }
085            s.close();
086
087            // Select the blob record so that we can update it.
088            updateBlob(c.getConnection(), statements.getFindMessageByIdStatement(), sequence, data);
089            if (xid != null) {
090                byte[] xidVal = xid.getEncodedXidBytes();
091                xidVal[0] = '+';
092                updateBlob(c.getConnection(), statements.getFindXidByIdStatement(), sequence, xidVal);
093            }
094
095        } finally {
096            close(s);
097        }
098    }
099
100    private void updateBlob(Connection connection, String findMessageByIdStatement, long sequence, byte[] data) throws SQLException, IOException {
101        PreparedStatement s = null;
102        ResultSet rs = null;
103        try {
104            s = connection.prepareStatement(statements.getFindMessageByIdStatement(),
105                ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
106            s.setLong(1, sequence);
107            rs = s.executeQuery();
108            if (!rs.next()) {
109                throw new IOException("Failed select blob for message: " + sequence + " in container.");
110            }
111
112            // Update the blob
113            Blob blob = rs.getBlob(1);
114            blob.truncate(0);
115            blob.setBytes(1, data);
116            rs.updateBlob(1, blob);
117            rs.updateRow();             // Update the row with the updated blob
118        } finally {
119            close(rs);
120            close(s);
121        }
122    }
123
124    @Override
125    public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
126        PreparedStatement s = null;
127        ResultSet rs = null;
128        try {
129
130            s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
131            s.setString(1, id.getProducerId().toString());
132            s.setLong(2, id.getProducerSequenceId());
133            rs = s.executeQuery();
134
135            if (!rs.next()) {
136                return null;
137            }
138            Blob blob = rs.getBlob(1);
139
140            try(InputStream is = blob.getBinaryStream();
141                ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length())) {
142                int ch;
143                while ((ch = is.read()) >= 0) {
144                    os.write(ch);
145                }
146                return os.toByteArray();
147            }
148        } finally {
149            close(rs);
150            close(s);
151        }
152    }
153
154}