001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc.adapter; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.sql.Blob; 022import java.sql.Connection; 023import java.sql.PreparedStatement; 024import java.sql.ResultSet; 025import java.sql.SQLException; 026 027import org.apache.activemq.command.ActiveMQDestination; 028import org.apache.activemq.command.MessageId; 029import org.apache.activemq.command.XATransactionId; 030import org.apache.activemq.store.jdbc.Statements; 031import org.apache.activemq.store.jdbc.TransactionContext; 032import org.apache.activemq.util.ByteArrayOutputStream; 033 034/** 035 * This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob() 036 * operations. This is a little more involved since to insert a blob you have 037 * to: 038 * 039 * 1: insert empty blob. 2: select the blob 3: finally update the blob with data 040 * value. 041 * 042 * The databases/JDBC drivers that use this adapter are: 043 * <ul> 044 * <li></li> 045 * </ul> 046 * 047 * @org.apache.xbean.XBean element="blobJDBCAdapter" 048 * 049 * 050 */ 051public class BlobJDBCAdapter extends DefaultJDBCAdapter { 052 053 @Override 054 public void setStatements(Statements statements) { 055 056 String addMessageStatement = "INSERT INTO " 057 + statements.getFullMessageTableName() 058 + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG, XID) VALUES (?, ?, ?, ?, ?, ?, empty_blob(), empty_blob())"; 059 statements.setAddMessageStatement(addMessageStatement); 060 061 String findMessageByIdStatement = "SELECT MSG FROM " + 062 statements.getFullMessageTableName() + " WHERE ID=? FOR UPDATE"; 063 statements.setFindMessageByIdStatement(findMessageByIdStatement); 064 065 super.setStatements(statements); 066 } 067 068 @Override 069 public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, 070 long expiration, byte priority, XATransactionId xid) throws SQLException, IOException { 071 PreparedStatement s = null; 072 try { 073 // Add the Blob record. 074 s = c.getConnection().prepareStatement(statements.getAddMessageStatement()); 075 s.setLong(1, sequence); 076 s.setString(2, messageID.getProducerId().toString()); 077 s.setLong(3, messageID.getProducerSequenceId()); 078 s.setString(4, destination.getQualifiedName()); 079 s.setLong(5, expiration); 080 s.setLong(6, priority); 081 082 if (s.executeUpdate() != 1) { 083 throw new IOException("Failed to add broker message: " + messageID + " in container."); 084 } 085 s.close(); 086 087 // Select the blob record so that we can update it. 088 updateBlob(c.getConnection(), statements.getFindMessageByIdStatement(), sequence, data); 089 if (xid != null) { 090 byte[] xidVal = xid.getEncodedXidBytes(); 091 xidVal[0] = '+'; 092 updateBlob(c.getConnection(), statements.getFindXidByIdStatement(), sequence, xidVal); 093 } 094 095 } finally { 096 close(s); 097 } 098 } 099 100 private void updateBlob(Connection connection, String findMessageByIdStatement, long sequence, byte[] data) throws SQLException, IOException { 101 PreparedStatement s = null; 102 ResultSet rs = null; 103 try { 104 s = connection.prepareStatement(statements.getFindMessageByIdStatement(), 105 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE); 106 s.setLong(1, sequence); 107 rs = s.executeQuery(); 108 if (!rs.next()) { 109 throw new IOException("Failed select blob for message: " + sequence + " in container."); 110 } 111 112 // Update the blob 113 Blob blob = rs.getBlob(1); 114 blob.truncate(0); 115 blob.setBytes(1, data); 116 rs.updateBlob(1, blob); 117 rs.updateRow(); // Update the row with the updated blob 118 } finally { 119 close(rs); 120 close(s); 121 } 122 } 123 124 @Override 125 public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException { 126 PreparedStatement s = null; 127 ResultSet rs = null; 128 try { 129 130 s = c.getConnection().prepareStatement(statements.getFindMessageStatement()); 131 s.setString(1, id.getProducerId().toString()); 132 s.setLong(2, id.getProducerSequenceId()); 133 rs = s.executeQuery(); 134 135 if (!rs.next()) { 136 return null; 137 } 138 Blob blob = rs.getBlob(1); 139 140 try(InputStream is = blob.getBinaryStream(); 141 ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length())) { 142 int ch; 143 while ((ch = is.read()) >= 0) { 144 os.write(ch); 145 } 146 return os.toByteArray(); 147 } 148 } finally { 149 close(rs); 150 close(s); 151 } 152 } 153 154}