001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.CancellationException;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.Future;
031
032import org.apache.activemq.broker.ConnectionContext;
033import org.apache.activemq.command.Message;
034import org.apache.activemq.command.MessageAck;
035import org.apache.activemq.command.MessageId;
036import org.apache.activemq.command.TransactionId;
037import org.apache.activemq.command.XATransactionId;
038import org.apache.activemq.protobuf.Buffer;
039import org.apache.activemq.store.AbstractMessageStore;
040import org.apache.activemq.store.ListenableFuture;
041import org.apache.activemq.store.MessageStore;
042import org.apache.activemq.store.ProxyMessageStore;
043import org.apache.activemq.store.ProxyTopicMessageStore;
044import org.apache.activemq.store.TopicMessageStore;
045import org.apache.activemq.store.TransactionRecoveryListener;
046import org.apache.activemq.store.TransactionStore;
047import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
048import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
049import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
050import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
051import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
052import org.apache.activemq.wireformat.WireFormat;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * Provides a TransactionStore implementation that can create transaction aware
058 * MessageStore objects from non transaction aware MessageStore objects.
059 *
060 *
061 */
062public class KahaDBTransactionStore implements TransactionStore {
063    static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
064    ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
065    private final KahaDBStore theStore;
066
067    public KahaDBTransactionStore(KahaDBStore theStore) {
068        this.theStore = theStore;
069    }
070
071    private WireFormat wireFormat(){
072      return this.theStore.wireFormat;
073    }
074
075    public class Tx {
076        private final List<AddMessageCommand> messages = Collections.synchronizedList(new ArrayList<AddMessageCommand>());
077
078        private final List<RemoveMessageCommand> acks = Collections.synchronizedList(new ArrayList<RemoveMessageCommand>());
079
080        public void add(AddMessageCommand msg) {
081            messages.add(msg);
082        }
083
084        public void add(RemoveMessageCommand ack) {
085            acks.add(ack);
086        }
087
088        public Message[] getMessages() {
089            Message rc[] = new Message[messages.size()];
090            int count = 0;
091            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
092                AddMessageCommand cmd = iter.next();
093                rc[count++] = cmd.getMessage();
094            }
095            return rc;
096        }
097
098        public MessageAck[] getAcks() {
099            MessageAck rc[] = new MessageAck[acks.size()];
100            int count = 0;
101            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
102                RemoveMessageCommand cmd = iter.next();
103                rc[count++] = cmd.getMessageAck();
104            }
105            return rc;
106        }
107
108        /**
109         * @return true if something to commit
110         * @throws IOException
111         */
112        public List<Future<Object>> commit() throws IOException {
113            List<Future<Object>> results = new ArrayList<Future<Object>>();
114            // Do all the message adds.
115            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
116                AddMessageCommand cmd = iter.next();
117                results.add(cmd.run());
118
119            }
120            // And removes..
121            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
122                RemoveMessageCommand cmd = iter.next();
123                cmd.run();
124                results.add(cmd.run());
125            }
126
127            return results;
128        }
129    }
130
131    public abstract class AddMessageCommand {
132        private final ConnectionContext ctx;
133        AddMessageCommand(ConnectionContext ctx) {
134            this.ctx = ctx;
135        }
136        abstract Message getMessage();
137        Future<Object> run() throws IOException {
138            return run(this.ctx);
139        }
140        abstract Future<Object> run(ConnectionContext ctx) throws IOException;
141    }
142
143    public abstract class RemoveMessageCommand {
144
145        private final ConnectionContext ctx;
146        RemoveMessageCommand(ConnectionContext ctx) {
147            this.ctx = ctx;
148        }
149        abstract MessageAck getMessageAck();
150        Future<Object> run() throws IOException {
151            return run(this.ctx);
152        }
153        abstract Future<Object> run(ConnectionContext context) throws IOException;
154    }
155
156    public MessageStore proxy(MessageStore messageStore) {
157        return new ProxyMessageStore(messageStore) {
158            @Override
159            public void addMessage(ConnectionContext context, final Message send) throws IOException {
160                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
161            }
162
163            @Override
164            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
165                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
166            }
167
168            @Override
169            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
170                return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
171            }
172
173            @Override
174            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
175                return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
176            }
177
178            @Override
179            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
180                KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
181            }
182
183            @Override
184            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
185                KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
186            }
187        };
188    }
189
190    public TopicMessageStore proxy(TopicMessageStore messageStore) {
191        return new ProxyTopicMessageStore(messageStore) {
192            @Override
193            public void addMessage(ConnectionContext context, final Message send) throws IOException {
194                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
195            }
196
197            @Override
198            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
199                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
200            }
201
202            @Override
203            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
204                return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
205            }
206
207            @Override
208            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
209                return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
210            }
211
212            @Override
213            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
214                KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
215            }
216
217            @Override
218            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
219                KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
220            }
221
222            @Override
223            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
224                            MessageId messageId, MessageAck ack) throws IOException {
225                KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId,
226                        subscriptionName, messageId, ack);
227            }
228
229        };
230    }
231
232    /**
233     * @throws IOException
234     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
235     */
236    @Override
237    public void prepare(TransactionId txid) throws IOException {
238        KahaTransactionInfo info = getTransactionInfo(txid);
239        if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
240            theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
241        } else {
242            Tx tx = inflightTransactions.remove(txid);
243            if (tx != null) {
244               theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
245            }
246        }
247    }
248
249    public Tx getTx(Object txid) {
250        Tx tx = inflightTransactions.get(txid);
251        if (tx == null) {
252            synchronized (inflightTransactions) {
253                tx = inflightTransactions.get(txid);
254                if (tx == null) {
255                    tx = new Tx();
256                    inflightTransactions.put(txid, tx);
257                }
258            }
259        }
260        return tx;
261    }
262
263    @Override
264    public void commit(TransactionId txid, boolean wasPrepared, final Runnable preCommit, Runnable postCommit)
265            throws IOException {
266        if (txid != null) {
267            if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) {
268                if (preCommit != null) {
269                    preCommit.run();
270                }
271                Tx tx = inflightTransactions.remove(txid);
272                if (tx != null) {
273                    List<Future<Object>> results = tx.commit();
274                    boolean doneSomething = false;
275                    for (Future<Object> result : results) {
276                        try {
277                            result.get();
278                        } catch (InterruptedException e) {
279                            theStore.brokerService.handleIOException(new IOException(e.getMessage()));
280                        } catch (ExecutionException e) {
281                            theStore.brokerService.handleIOException(new IOException(e.getMessage()));
282                        }catch(CancellationException e) {
283                        }
284                        if (!result.isCancelled()) {
285                            doneSomething = true;
286                        }
287                    }
288                    if (postCommit != null) {
289                        postCommit.run();
290                    }
291                    if (doneSomething) {
292                        KahaTransactionInfo info = getTransactionInfo(txid);
293                        theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null);
294                    }
295                }else {
296                    //The Tx will be null for failed over clients - lets run their post commits
297                    if (postCommit != null) {
298                        postCommit.run();
299                    }
300                }
301
302            } else {
303                KahaTransactionInfo info = getTransactionInfo(txid);
304                if (preCommit != null) {
305                    preCommit.run();
306                }
307                theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit);
308                forgetRecoveredAcks(txid, false);
309            }
310        }else {
311           LOG.error("Null transaction passed on commit");
312        }
313    }
314
315    /**
316     * @throws IOException
317     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
318     */
319    @Override
320    public void rollback(TransactionId txid) throws IOException {
321        if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
322            KahaTransactionInfo info = getTransactionInfo(txid);
323            theStore.store(new KahaRollbackCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null);
324            forgetRecoveredAcks(txid, true);
325        } else {
326            inflightTransactions.remove(txid);
327        }
328    }
329
330    protected void forgetRecoveredAcks(TransactionId txid, boolean isRollback) throws IOException {
331        if (txid.isXATransaction()) {
332            XATransactionId xaTid = ((XATransactionId) txid);
333            theStore.forgetRecoveredAcks(xaTid.getPreparedAcks(), isRollback);
334        }
335    }
336
337    @Override
338    public void start() throws Exception {
339    }
340
341    @Override
342    public void stop() throws Exception {
343    }
344
345    @Override
346    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
347        for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
348            XATransactionId xid = (XATransactionId) entry.getKey();
349            ArrayList<Message> messageList = new ArrayList<Message>();
350            ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
351
352            for (Operation op : entry.getValue()) {
353                if (op.getClass() == MessageDatabase.AddOperation.class) {
354                    MessageDatabase.AddOperation addOp = (MessageDatabase.AddOperation) op;
355                    Message msg = (Message) wireFormat().unmarshal(new DataInputStream(addOp.getCommand().getMessage()
356                            .newInput()));
357                    messageList.add(msg);
358                } else {
359                    MessageDatabase.RemoveOperation rmOp = (MessageDatabase.RemoveOperation) op;
360                    Buffer ackb = rmOp.getCommand().getAck();
361                    MessageAck ack = (MessageAck) wireFormat().unmarshal(new DataInputStream(ackb.newInput()));
362                    ackList.add(ack);
363                }
364            }
365
366            Message[] addedMessages = new Message[messageList.size()];
367            MessageAck[] acks = new MessageAck[ackList.size()];
368            messageList.toArray(addedMessages);
369            ackList.toArray(acks);
370            xid.setPreparedAcks(ackList);
371            theStore.trackRecoveredAcks(ackList);
372            listener.recover(xid, addedMessages, acks);
373        }
374    }
375
376    /**
377     * @param message
378     * @throws IOException
379     */
380    void addMessage(ConnectionContext context, final MessageStore destination, final Message message)
381            throws IOException {
382
383        if (message.getTransactionId() != null) {
384            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
385                destination.addMessage(context, message);
386            } else {
387                Tx tx = getTx(message.getTransactionId());
388                tx.add(new AddMessageCommand(context) {
389                    @Override
390                    public Message getMessage() {
391                        return message;
392                    }
393                    @Override
394                    public Future<Object> run(ConnectionContext ctx) throws IOException {
395                        destination.addMessage(ctx, message);
396                        return AbstractMessageStore.FUTURE;
397                    }
398
399                });
400            }
401        } else {
402            destination.addMessage(context, message);
403        }
404    }
405
406    ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
407            throws IOException {
408
409        if (message.getTransactionId() != null) {
410            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
411                destination.addMessage(context, message);
412                return AbstractMessageStore.FUTURE;
413            } else {
414                Tx tx = getTx(message.getTransactionId());
415                tx.add(new AddMessageCommand(context) {
416                    @Override
417                    public Message getMessage() {
418                        return message;
419                    }
420                    @Override
421                    public Future<Object> run(ConnectionContext ctx) throws IOException {
422                        return destination.asyncAddQueueMessage(ctx, message);
423                    }
424
425                });
426                return AbstractMessageStore.FUTURE;
427            }
428        } else {
429            return destination.asyncAddQueueMessage(context, message);
430        }
431    }
432
433    ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
434            throws IOException {
435
436        if (message.getTransactionId() != null) {
437            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
438                destination.addMessage(context, message);
439                return AbstractMessageStore.FUTURE;
440            } else {
441                Tx tx = getTx(message.getTransactionId());
442                tx.add(new AddMessageCommand(context) {
443                    @Override
444                    public Message getMessage() {
445                        return message;
446                    }
447                    @Override
448                    public Future<Object> run(ConnectionContext ctx) throws IOException {
449                        return destination.asyncAddTopicMessage(ctx, message);
450                    }
451
452                });
453                return AbstractMessageStore.FUTURE;
454            }
455        } else {
456            return destination.asyncAddTopicMessage(context, message);
457        }
458    }
459
460    /**
461     * @param ack
462     * @throws IOException
463     */
464    final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
465            throws IOException {
466
467        if (ack.isInTransaction()) {
468            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
469                destination.removeMessage(context, ack);
470            } else {
471                Tx tx = getTx(ack.getTransactionId());
472                tx.add(new RemoveMessageCommand(context) {
473                    @Override
474                    public MessageAck getMessageAck() {
475                        return ack;
476                    }
477
478                    @Override
479                    public Future<Object> run(ConnectionContext ctx) throws IOException {
480                        destination.removeMessage(ctx, ack);
481                        return AbstractMessageStore.FUTURE;
482                    }
483                });
484            }
485        } else {
486            destination.removeMessage(context, ack);
487        }
488    }
489
490    final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
491            throws IOException {
492
493        if (ack.isInTransaction()) {
494            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
495                destination.removeAsyncMessage(context, ack);
496            } else {
497                Tx tx = getTx(ack.getTransactionId());
498                tx.add(new RemoveMessageCommand(context) {
499                    @Override
500                    public MessageAck getMessageAck() {
501                        return ack;
502                    }
503
504                    @Override
505                    public Future<Object> run(ConnectionContext ctx) throws IOException {
506                        destination.removeMessage(ctx, ack);
507                        return AbstractMessageStore.FUTURE;
508                    }
509                });
510            }
511        } else {
512            destination.removeAsyncMessage(context, ack);
513        }
514    }
515
516    final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName,
517                           final MessageId messageId, final MessageAck ack) throws IOException {
518
519        if (ack.isInTransaction()) {
520            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
521                destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
522            } else {
523                Tx tx = getTx(ack.getTransactionId());
524                tx.add(new RemoveMessageCommand(context) {
525                    @Override
526                    public MessageAck getMessageAck() {
527                        return ack;
528                    }
529
530                    @Override
531                    public Future<Object> run(ConnectionContext ctx) throws IOException {
532                        destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
533                        return AbstractMessageStore.FUTURE;
534                    }
535                });
536            }
537        } else {
538            destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
539        }
540    }
541
542
543    private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
544        return TransactionIdConversion.convert(theStore.getTransactionIdTransformer().transform(txid));
545    }
546}