001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.CancellationException;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.Future;
031
032import org.apache.activemq.broker.ConnectionContext;
033import org.apache.activemq.command.ConsumerId;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.MessageAck;
036import org.apache.activemq.command.MessageId;
037import org.apache.activemq.command.TransactionId;
038import org.apache.activemq.command.XATransactionId;
039import org.apache.activemq.protobuf.Buffer;
040import org.apache.activemq.store.AbstractMessageStore;
041import org.apache.activemq.store.ListenableFuture;
042import org.apache.activemq.store.MessageStore;
043import org.apache.activemq.store.ProxyMessageStore;
044import org.apache.activemq.store.ProxyTopicMessageStore;
045import org.apache.activemq.store.TopicMessageStore;
046import org.apache.activemq.store.TransactionRecoveryListener;
047import org.apache.activemq.store.TransactionStore;
048import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
049import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
050import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
051import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
052import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
053import org.apache.activemq.wireformat.WireFormat;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * Provides a TransactionStore implementation that can create transaction aware
059 * MessageStore objects from non transaction aware MessageStore objects.
060 *
061 *
062 */
063public class KahaDBTransactionStore implements TransactionStore {
064    static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
065    ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
066    private final KahaDBStore theStore;
067
068    public KahaDBTransactionStore(KahaDBStore theStore) {
069        this.theStore = theStore;
070    }
071
072    private WireFormat wireFormat(){
073      return this.theStore.wireFormat;
074    }
075
076    public class Tx {
077        private final List<AddMessageCommand> messages = Collections.synchronizedList(new ArrayList<AddMessageCommand>());
078
079        private final List<RemoveMessageCommand> acks = Collections.synchronizedList(new ArrayList<RemoveMessageCommand>());
080
081        public void add(AddMessageCommand msg) {
082            messages.add(msg);
083        }
084
085        public void add(RemoveMessageCommand ack) {
086            acks.add(ack);
087        }
088
089        public Message[] getMessages() {
090            Message rc[] = new Message[messages.size()];
091            int count = 0;
092            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
093                AddMessageCommand cmd = iter.next();
094                rc[count++] = cmd.getMessage();
095            }
096            return rc;
097        }
098
099        public MessageAck[] getAcks() {
100            MessageAck rc[] = new MessageAck[acks.size()];
101            int count = 0;
102            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
103                RemoveMessageCommand cmd = iter.next();
104                rc[count++] = cmd.getMessageAck();
105            }
106            return rc;
107        }
108
109        /**
110         * @return true if something to commit
111         * @throws IOException
112         */
113        public List<Future<Object>> commit() throws IOException {
114            List<Future<Object>> results = new ArrayList<Future<Object>>();
115            // Do all the message adds.
116            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
117                AddMessageCommand cmd = iter.next();
118                results.add(cmd.run());
119
120            }
121            // And removes..
122            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
123                RemoveMessageCommand cmd = iter.next();
124                cmd.run();
125                results.add(cmd.run());
126            }
127
128            return results;
129        }
130    }
131
132    public abstract class AddMessageCommand {
133        private final ConnectionContext ctx;
134        AddMessageCommand(ConnectionContext ctx) {
135            this.ctx = ctx;
136        }
137        abstract Message getMessage();
138        Future<Object> run() throws IOException {
139            return run(this.ctx);
140        }
141        abstract Future<Object> run(ConnectionContext ctx) throws IOException;
142    }
143
144    public abstract class RemoveMessageCommand {
145
146        private final ConnectionContext ctx;
147        RemoveMessageCommand(ConnectionContext ctx) {
148            this.ctx = ctx;
149        }
150        abstract MessageAck getMessageAck();
151        Future<Object> run() throws IOException {
152            return run(this.ctx);
153        }
154        abstract Future<Object> run(ConnectionContext context) throws IOException;
155    }
156
157    public MessageStore proxy(MessageStore messageStore) {
158        return new ProxyMessageStore(messageStore) {
159            @Override
160            public void addMessage(ConnectionContext context, final Message send) throws IOException {
161                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
162            }
163
164            @Override
165            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
166                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
167            }
168
169            @Override
170            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
171                return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
172            }
173
174            @Override
175            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
176                return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
177            }
178
179            @Override
180            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
181                KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
182            }
183
184            @Override
185            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
186                KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
187            }
188        };
189    }
190
191    public TopicMessageStore proxy(TopicMessageStore messageStore) {
192        return new ProxyTopicMessageStore(messageStore) {
193            @Override
194            public void addMessage(ConnectionContext context, final Message send) throws IOException {
195                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
196            }
197
198            @Override
199            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
200                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
201            }
202
203            @Override
204            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
205                return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
206            }
207
208            @Override
209            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
210                return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
211            }
212
213            @Override
214            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
215                KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
216            }
217
218            @Override
219            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
220                KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
221            }
222
223            @Override
224            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
225                            MessageId messageId, MessageAck ack) throws IOException {
226                KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId,
227                        subscriptionName, messageId, ack);
228            }
229
230        };
231    }
232
233    /**
234     * @throws IOException
235     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
236     */
237    @Override
238    public void prepare(TransactionId txid) throws IOException {
239        KahaTransactionInfo info = getTransactionInfo(txid);
240        if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
241            theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
242        } else {
243            Tx tx = inflightTransactions.remove(txid);
244            if (tx != null) {
245               theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
246            }
247        }
248    }
249
250    public Tx getTx(Object txid) {
251        Tx tx = inflightTransactions.get(txid);
252        if (tx == null) {
253            synchronized (inflightTransactions) {
254                tx = inflightTransactions.get(txid);
255                if (tx == null) {
256                    tx = new Tx();
257                    inflightTransactions.put(txid, tx);
258                }
259            }
260        }
261        return tx;
262    }
263
264    @Override
265    public void commit(TransactionId txid, boolean wasPrepared, final Runnable preCommit, Runnable postCommit)
266            throws IOException {
267        if (txid != null) {
268            if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) {
269                if (preCommit != null) {
270                    preCommit.run();
271                }
272                Tx tx = inflightTransactions.remove(txid);
273                if (tx != null) {
274                    List<Future<Object>> results = tx.commit();
275                    boolean doneSomething = false;
276                    for (Future<Object> result : results) {
277                        try {
278                            result.get();
279                        } catch (InterruptedException e) {
280                            theStore.brokerService.handleIOException(new IOException(e.getMessage()));
281                        } catch (ExecutionException e) {
282                            theStore.brokerService.handleIOException(new IOException(e.getMessage()));
283                        }catch(CancellationException e) {
284                        }
285                        if (!result.isCancelled()) {
286                            doneSomething = true;
287                        }
288                    }
289                    if (postCommit != null) {
290                        postCommit.run();
291                    }
292                    if (doneSomething) {
293                        KahaTransactionInfo info = getTransactionInfo(txid);
294                        theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null);
295                    }
296                }else {
297                    //The Tx will be null for failed over clients - lets run their post commits
298                    if (postCommit != null) {
299                        postCommit.run();
300                    }
301                }
302
303            } else {
304                KahaTransactionInfo info = getTransactionInfo(txid);
305                if (preCommit != null) {
306                    preCommit.run();
307                }
308                theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit);
309                forgetRecoveredAcks(txid, false);
310            }
311        }else {
312           LOG.error("Null transaction passed on commit");
313        }
314    }
315
316    /**
317     * @throws IOException
318     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
319     */
320    @Override
321    public void rollback(TransactionId txid) throws IOException {
322        if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
323            KahaTransactionInfo info = getTransactionInfo(txid);
324            theStore.store(new KahaRollbackCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null);
325            forgetRecoveredAcks(txid, true);
326        } else {
327            inflightTransactions.remove(txid);
328        }
329    }
330
331    protected void forgetRecoveredAcks(TransactionId txid, boolean isRollback) throws IOException {
332        if (txid.isXATransaction()) {
333            XATransactionId xaTid = ((XATransactionId) txid);
334            theStore.forgetRecoveredAcks(xaTid.getPreparedAcks(), isRollback);
335        }
336    }
337
338    @Override
339    public void start() throws Exception {
340    }
341
342    @Override
343    public void stop() throws Exception {
344    }
345
346    @Override
347    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
348        for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
349            XATransactionId xid = (XATransactionId) entry.getKey();
350            ArrayList<Message> messageList = new ArrayList<Message>();
351            ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
352
353            for (Operation op : entry.getValue()) {
354                if (op.getClass() == MessageDatabase.AddOperation.class) {
355                    MessageDatabase.AddOperation addOp = (MessageDatabase.AddOperation) op;
356                    Message msg = (Message) wireFormat().unmarshal(new DataInputStream(addOp.getCommand().getMessage()
357                            .newInput()));
358                    messageList.add(msg);
359                } else {
360                    MessageDatabase.RemoveOperation rmOp = (MessageDatabase.RemoveOperation) op;
361                    Buffer ackb = rmOp.getCommand().getAck();
362                    MessageAck ack = (MessageAck) wireFormat().unmarshal(new DataInputStream(ackb.newInput()));
363                    // allow the ack to be tracked back to its durable sub
364                    ConsumerId subKey = new ConsumerId();
365                    subKey.setConnectionId(rmOp.getCommand().getSubscriptionKey());
366                    ack.setConsumerId(subKey);
367                    ackList.add(ack);
368                }
369            }
370
371            Message[] addedMessages = new Message[messageList.size()];
372            MessageAck[] acks = new MessageAck[ackList.size()];
373            messageList.toArray(addedMessages);
374            ackList.toArray(acks);
375            xid.setPreparedAcks(ackList);
376            theStore.trackRecoveredAcks(ackList);
377            listener.recover(xid, addedMessages, acks);
378        }
379    }
380
381    /**
382     * @param message
383     * @throws IOException
384     */
385    void addMessage(ConnectionContext context, final MessageStore destination, final Message message)
386            throws IOException {
387
388        if (message.getTransactionId() != null) {
389            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
390                destination.addMessage(context, message);
391            } else {
392                Tx tx = getTx(message.getTransactionId());
393                tx.add(new AddMessageCommand(context) {
394                    @Override
395                    public Message getMessage() {
396                        return message;
397                    }
398                    @Override
399                    public Future<Object> run(ConnectionContext ctx) throws IOException {
400                        destination.addMessage(ctx, message);
401                        return AbstractMessageStore.FUTURE;
402                    }
403
404                });
405            }
406        } else {
407            destination.addMessage(context, message);
408        }
409    }
410
411    ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
412            throws IOException {
413
414        if (message.getTransactionId() != null) {
415            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
416                destination.addMessage(context, message);
417                return AbstractMessageStore.FUTURE;
418            } else {
419                Tx tx = getTx(message.getTransactionId());
420                tx.add(new AddMessageCommand(context) {
421                    @Override
422                    public Message getMessage() {
423                        return message;
424                    }
425                    @Override
426                    public Future<Object> run(ConnectionContext ctx) throws IOException {
427                        return destination.asyncAddQueueMessage(ctx, message);
428                    }
429
430                });
431                return AbstractMessageStore.FUTURE;
432            }
433        } else {
434            return destination.asyncAddQueueMessage(context, message);
435        }
436    }
437
438    ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
439            throws IOException {
440
441        if (message.getTransactionId() != null) {
442            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
443                destination.addMessage(context, message);
444                return AbstractMessageStore.FUTURE;
445            } else {
446                Tx tx = getTx(message.getTransactionId());
447                tx.add(new AddMessageCommand(context) {
448                    @Override
449                    public Message getMessage() {
450                        return message;
451                    }
452                    @Override
453                    public Future<Object> run(ConnectionContext ctx) throws IOException {
454                        return destination.asyncAddTopicMessage(ctx, message);
455                    }
456
457                });
458                return AbstractMessageStore.FUTURE;
459            }
460        } else {
461            return destination.asyncAddTopicMessage(context, message);
462        }
463    }
464
465    /**
466     * @param ack
467     * @throws IOException
468     */
469    final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
470            throws IOException {
471
472        if (ack.isInTransaction()) {
473            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
474                destination.removeMessage(context, ack);
475            } else {
476                Tx tx = getTx(ack.getTransactionId());
477                tx.add(new RemoveMessageCommand(context) {
478                    @Override
479                    public MessageAck getMessageAck() {
480                        return ack;
481                    }
482
483                    @Override
484                    public Future<Object> run(ConnectionContext ctx) throws IOException {
485                        destination.removeMessage(ctx, ack);
486                        return AbstractMessageStore.FUTURE;
487                    }
488                });
489            }
490        } else {
491            destination.removeMessage(context, ack);
492        }
493    }
494
495    final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
496            throws IOException {
497
498        if (ack.isInTransaction()) {
499            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
500                destination.removeAsyncMessage(context, ack);
501            } else {
502                Tx tx = getTx(ack.getTransactionId());
503                tx.add(new RemoveMessageCommand(context) {
504                    @Override
505                    public MessageAck getMessageAck() {
506                        return ack;
507                    }
508
509                    @Override
510                    public Future<Object> run(ConnectionContext ctx) throws IOException {
511                        destination.removeMessage(ctx, ack);
512                        return AbstractMessageStore.FUTURE;
513                    }
514                });
515            }
516        } else {
517            destination.removeAsyncMessage(context, ack);
518        }
519    }
520
521    final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName,
522                           final MessageId messageId, final MessageAck ack) throws IOException {
523
524        if (ack.isInTransaction()) {
525            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
526                destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
527            } else {
528                Tx tx = getTx(ack.getTransactionId());
529                tx.add(new RemoveMessageCommand(context) {
530                    @Override
531                    public MessageAck getMessageAck() {
532                        return ack;
533                    }
534
535                    @Override
536                    public Future<Object> run(ConnectionContext ctx) throws IOException {
537                        destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
538                        return AbstractMessageStore.FUTURE;
539                    }
540                });
541            }
542        } else {
543            destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
544        }
545    }
546
547
548    private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
549        return TransactionIdConversion.convert(theStore.getTransactionIdTransformer().transform(txid));
550    }
551}