001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Iterator;
022
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.command.ActiveMQDestination;
025import org.apache.activemq.command.ActiveMQQueue;
026import org.apache.activemq.command.ActiveMQTopic;
027import org.apache.activemq.command.Message;
028import org.apache.activemq.command.MessageAck;
029import org.apache.activemq.command.MessageId;
030import org.apache.activemq.command.TransactionId;
031import org.apache.activemq.command.XATransactionId;
032import org.apache.activemq.store.IndexListener;
033import org.apache.activemq.store.MessageStore;
034import org.apache.activemq.store.ProxyMessageStore;
035import org.apache.activemq.store.TopicMessageStore;
036import org.apache.activemq.store.TransactionRecoveryListener;
037import org.apache.activemq.store.memory.MemoryTransactionStore;
038import org.apache.activemq.util.ByteSequence;
039import org.apache.activemq.util.DataByteArrayInputStream;
040
041/**
042 * respect 2pc prepare
043 * uses local transactions to maintain prepared state
044 * xid column provides transaction flag for additions and removals
045 * a commit clears that context and completes the work
046 * a rollback clears the flag and removes the additions
047 * Essentially a prepare is an insert &| update transaction
048 *  commit|rollback is an update &| remove
049 */
050public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
051
052
053    public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) {
054        super(jdbcPersistenceAdapter);
055    }
056
057    @Override
058    public void prepare(TransactionId txid) throws IOException {
059        Tx tx = inflightTransactions.remove(txid);
060        if (tx == null) {
061            return;
062        }
063
064        ConnectionContext ctx = new ConnectionContext();
065        // setting the xid modifies the add/remove to be pending transaction outcome
066        ctx.setXid((XATransactionId) txid);
067        persistenceAdapter.beginTransaction(ctx);
068        try {
069
070            // Do all the message adds.
071            for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) {
072                AddMessageCommand cmd = iter.next();
073                cmd.run(ctx);
074            }
075            // And removes..
076            for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext();) {
077                RemoveMessageCommand cmd = iter.next();
078                cmd.run(ctx);
079            }
080
081            persistenceAdapter.commitTransaction(ctx);
082
083        } catch ( IOException e ) {
084            persistenceAdapter.rollbackTransaction(ctx);
085            throw e;
086        }
087
088        ctx.setXid(null);
089        // setup for commit outcome
090        ArrayList<AddMessageCommand> updateFromPreparedStateCommands = new ArrayList<AddMessageCommand>();
091        for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) {
092            final AddMessageCommand addMessageCommand = iter.next();
093            updateFromPreparedStateCommands.add(new CommitAddOutcome(addMessageCommand));
094        }
095        tx.messages = updateFromPreparedStateCommands;
096        preparedTransactions.put(txid, tx);
097
098    }
099
100
101    class CommitAddOutcome implements AddMessageCommand {
102        final Message message;
103        JDBCMessageStore jdbcMessageStore;
104
105        public CommitAddOutcome(JDBCMessageStore jdbcMessageStore, Message message) {
106            this.jdbcMessageStore = jdbcMessageStore;
107            this.message = message;
108        }
109
110        public CommitAddOutcome(AddMessageCommand addMessageCommand) {
111            this((JDBCMessageStore)addMessageCommand.getMessageStore(), addMessageCommand.getMessage());
112        }
113
114        @Override
115        public Message getMessage() {
116            return message;
117        }
118
119        @Override
120        public MessageStore getMessageStore() {
121            return jdbcMessageStore;
122        }
123
124        @Override
125        public void run(final ConnectionContext context) throws IOException {
126            JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) persistenceAdapter;
127            final Long preparedEntrySequence = (Long) message.getMessageId().getEntryLocator();
128            TransactionContext c = jdbcPersistenceAdapter.getTransactionContext(context);
129
130            long newSequence;
131            synchronized (jdbcMessageStore.pendingAdditions) {
132                newSequence = jdbcPersistenceAdapter.getNextSequenceId();
133                final long sequenceToSet = newSequence;
134                c.onCompletion(new Runnable() {
135                    @Override
136                    public void run() {
137                        message.getMessageId().setEntryLocator(sequenceToSet);
138                        message.getMessageId().setFutureOrSequenceLong(sequenceToSet);
139                    }
140                });
141
142                if (jdbcMessageStore.getIndexListener() != null) {
143                    jdbcMessageStore.getIndexListener().onAdd(new IndexListener.MessageContext(context, message, null));
144                }
145            }
146
147            jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), preparedEntrySequence, newSequence);
148            jdbcMessageStore.onAdd(message, (Long)message.getMessageId().getEntryLocator(), message.getPriority());
149        }
150
151        @Override
152        public void setMessageStore(MessageStore messageStore) {
153            jdbcMessageStore = (JDBCMessageStore) messageStore;
154        }
155    }
156
157    @Override
158    public void rollback(TransactionId txid) throws IOException {
159
160        Tx tx = inflightTransactions.remove(txid);
161        if (tx == null) {
162            tx = preparedTransactions.remove(txid);
163            if (tx != null) {
164                // undo prepare work
165                ConnectionContext ctx = new ConnectionContext();
166                persistenceAdapter.beginTransaction(ctx);
167                try {
168
169                    for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext(); ) {
170                        final Message message = iter.next().getMessage();
171                        // need to delete the row
172                        ((JDBCPersistenceAdapter) persistenceAdapter).commitRemove(ctx, new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, 1));
173                    }
174
175                    for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext(); ) {
176                        RemoveMessageCommand removeMessageCommand = iter.next();
177                        if (removeMessageCommand instanceof LastAckCommand ) {
178                            ((LastAckCommand)removeMessageCommand).rollback(ctx);
179                        } else {
180                            MessageId messageId = removeMessageCommand.getMessageAck().getLastMessageId();
181                            long sequence = (Long)messageId.getEntryLocator();
182                            // need to unset the txid flag on the existing row
183                            ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx, messageId, sequence, sequence);
184
185                            if (removeMessageCommand instanceof RecoveredRemoveMessageCommand) {
186                                ((JDBCMessageStore) removeMessageCommand.getMessageStore()).trackRollbackAck(((RecoveredRemoveMessageCommand) removeMessageCommand).getMessage());
187                            }
188                        }
189                    }
190                } catch (IOException e) {
191                    persistenceAdapter.rollbackTransaction(ctx);
192                    throw e;
193                }
194                persistenceAdapter.commitTransaction(ctx);
195            }
196        }
197    }
198
199    @Override
200    public void recover(TransactionRecoveryListener listener) throws IOException {
201        ((JDBCPersistenceAdapter)persistenceAdapter).recover(this);
202        super.recover(listener);
203    }
204
205    public void recoverAdd(long id, byte[] messageBytes) throws IOException {
206        final Message message = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(messageBytes));
207        message.getMessageId().setFutureOrSequenceLong(id);
208        message.getMessageId().setEntryLocator(id);
209        Tx tx = getPreparedTx(message.getTransactionId());
210        tx.add(new CommitAddOutcome(null, message));
211    }
212
213    public void recoverAck(long id, byte[] xid, byte[] message) throws IOException {
214        final Message msg = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(message));
215        msg.getMessageId().setFutureOrSequenceLong(id);
216        msg.getMessageId().setEntryLocator(id);
217        Tx tx = getPreparedTx(new XATransactionId(xid));
218        final MessageAck ack = new MessageAck(msg, MessageAck.STANDARD_ACK_TYPE, 1);
219        tx.add(new RecoveredRemoveMessageCommand() {
220            MessageStore messageStore = null;
221            @Override
222            public MessageAck getMessageAck() {
223                return ack;
224            }
225
226            @Override
227            public void run(ConnectionContext context) throws IOException {
228                ((JDBCPersistenceAdapter)persistenceAdapter).commitRemove(context, ack);
229            }
230
231            public Message getMessage() {
232                return msg;
233            }
234
235            @Override
236            public void setMessageStore(MessageStore messageStore) {
237                this.messageStore = messageStore;
238            }
239
240            @Override
241            public MessageStore getMessageStore() {
242                return messageStore;
243            }
244
245        });
246    }
247
248    interface RecoveredRemoveMessageCommand extends RemoveMessageCommand {
249        Message getMessage();
250
251        void setMessageStore(MessageStore messageStore);
252    }
253
254    interface LastAckCommand extends RemoveMessageCommand {
255        void rollback(ConnectionContext context) throws IOException;
256
257        String getClientId();
258
259        String getSubName();
260
261        long getSequence();
262
263        byte getPriority();
264
265        void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore);
266    }
267
268    public void recoverLastAck(byte[] encodedXid, final ActiveMQDestination destination, final String subName, final String clientId) throws IOException {
269        Tx tx = getPreparedTx(new XATransactionId(encodedXid));
270        DataByteArrayInputStream inputStream = new DataByteArrayInputStream(encodedXid);
271        inputStream.skipBytes(1); // +|-
272        final long lastAck = inputStream.readLong();
273        final byte priority = inputStream.readByte();
274        final MessageAck ack = new MessageAck();
275        ack.setDestination(destination);
276        tx.add(new LastAckCommand() {
277            JDBCTopicMessageStore jdbcTopicMessageStore;
278
279            @Override
280            public MessageAck getMessageAck() {
281                return ack;
282            }
283
284            @Override
285            public MessageStore getMessageStore() {
286                return jdbcTopicMessageStore;
287            }
288
289            @Override
290            public void run(ConnectionContext context) throws IOException {
291                ((JDBCPersistenceAdapter)persistenceAdapter).commitLastAck(context, lastAck, priority, destination, subName, clientId);
292                jdbcTopicMessageStore.complete(clientId, subName);
293            }
294
295            @Override
296            public void rollback(ConnectionContext context) throws IOException {
297                ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, jdbcTopicMessageStore.isPrioritizedMessages() ? priority : 0, jdbcTopicMessageStore.getDestination(), subName, clientId);
298                jdbcTopicMessageStore.complete(clientId, subName);
299            }
300
301            @Override
302            public String getClientId() {
303                return clientId;
304            }
305
306            @Override
307            public String getSubName() {
308                return subName;
309            }
310
311            @Override
312            public long getSequence() {
313                return lastAck;
314            }
315
316            @Override
317            public byte getPriority() {
318                return priority;
319            }
320
321            @Override
322            public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
323                this.jdbcTopicMessageStore = jdbcTopicMessageStore;
324            }
325        });
326
327    }
328
329    @Override
330    protected void onRecovered(Tx tx) {
331        for (RemoveMessageCommand removeMessageCommand: tx.acks) {
332            if (removeMessageCommand instanceof LastAckCommand) {
333                LastAckCommand lastAckCommand = (LastAckCommand) removeMessageCommand;
334                JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) findMessageStore(lastAckCommand.getMessageAck().getDestination());
335                jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority());
336                lastAckCommand.setMessageStore(jdbcTopicMessageStore);
337            } else {
338                ((RecoveredRemoveMessageCommand)removeMessageCommand).setMessageStore(findMessageStore(removeMessageCommand.getMessageAck().getDestination()));
339            }
340        }
341        for (AddMessageCommand addMessageCommand : tx.messages) {
342            addMessageCommand.setMessageStore(findMessageStore(addMessageCommand.getMessage().getDestination()));
343        }
344    }
345
346    private MessageStore findMessageStore(ActiveMQDestination destination) {
347        ProxyMessageStore proxyMessageStore = null;
348        try {
349            if (destination.isQueue()) {
350                proxyMessageStore = (ProxyMessageStore) persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
351            } else {
352                proxyMessageStore = (ProxyMessageStore) persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
353            }
354        } catch (IOException error) {
355            throw new RuntimeException("Failed to find/create message store for destination: " + destination, error);
356        }
357        return proxyMessageStore.getDelegate();
358    }
359
360    @Override
361    public void acknowledge(final TopicMessageStore topicMessageStore, final String clientId, final String subscriptionName,
362                           final MessageId messageId, final MessageAck ack) throws IOException {
363
364        if (ack.isInTransaction()) {
365            Tx tx = getTx(ack.getTransactionId());
366            tx.add(new LastAckCommand() {
367                public MessageAck getMessageAck() {
368                    return ack;
369                }
370
371                public void run(ConnectionContext ctx) throws IOException {
372                    topicMessageStore.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
373                }
374
375                @Override
376                public MessageStore getMessageStore() {
377                    return topicMessageStore;
378                }
379
380                @Override
381                public void rollback(ConnectionContext context) throws IOException {
382                    JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore)topicMessageStore;
383                    ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context,
384                            jdbcTopicMessageStore,
385                            ack,
386                            subscriptionName, clientId);
387                    jdbcTopicMessageStore.complete(clientId, subscriptionName);
388                }
389
390
391                @Override
392                public String getClientId() {
393                    return clientId;
394                }
395
396                @Override
397                public String getSubName() {
398                    return subscriptionName;
399                }
400
401                @Override
402                public long getSequence() {
403                    throw new IllegalStateException("Sequence id must be inferred from ack");
404                }
405
406                @Override
407                public byte getPriority() {
408                    throw new IllegalStateException("Priority must be inferred from ack or row");
409                }
410
411                @Override
412                public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
413                    throw new IllegalStateException("message store already known!");
414                }
415            });
416        } else {
417            topicMessageStore.acknowledge(null, clientId, subscriptionName, messageId, ack);
418        }
419    }
420
421}