001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.Date;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.Map;
025import java.util.Set;
026import java.util.TreeSet;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.atomic.AtomicBoolean;
030
031import org.apache.activemq.broker.Broker;
032import org.apache.activemq.broker.ConnectionContext;
033import org.apache.activemq.broker.region.BaseDestination;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.MessageAck;
036import org.apache.activemq.command.MessageId;
037import org.apache.activemq.command.TransactionId;
038import org.apache.activemq.command.XATransactionId;
039import org.apache.activemq.store.AbstractMessageStore;
040import org.apache.activemq.store.IndexListener;
041import org.apache.activemq.store.ListenableFuture;
042import org.apache.activemq.store.MessageStore;
043import org.apache.activemq.store.PersistenceAdapter;
044import org.apache.activemq.store.ProxyMessageStore;
045import org.apache.activemq.store.ProxyTopicMessageStore;
046import org.apache.activemq.store.TopicMessageStore;
047import org.apache.activemq.store.TransactionRecoveryListener;
048import org.apache.activemq.store.TransactionStore;
049import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
050import org.apache.activemq.store.kahadb.data.KahaEntryType;
051import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
052import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
053import org.apache.activemq.store.kahadb.disk.journal.Journal;
054import org.apache.activemq.store.kahadb.disk.journal.Location;
055import org.apache.activemq.usage.StoreUsage;
056import org.apache.activemq.util.DataByteArrayInputStream;
057import org.apache.activemq.util.DataByteArrayOutputStream;
058import org.apache.activemq.util.IOHelper;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062public class MultiKahaDBTransactionStore implements TransactionStore {
063    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
064    final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
065    final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
066    final ConcurrentMap<TransactionId, Tx> pendingCommit = new ConcurrentHashMap<TransactionId, Tx>();
067    private Journal journal;
068    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
069    private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
070    private final AtomicBoolean started = new AtomicBoolean(false);
071    private final AtomicBoolean recovered = new AtomicBoolean(false);
072    private long journalCleanupInterval = Journal.DEFAULT_CLEANUP_INTERVAL;
073
074    public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
075        this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
076    }
077
078    public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) {
079        return new ProxyMessageStore(messageStore) {
080            @Override
081            public void addMessage(ConnectionContext context, final Message send) throws IOException {
082                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
083            }
084
085            @Override
086            public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
087                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
088            }
089
090            @Override
091            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
092                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
093            }
094
095            @Override
096            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
097                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
098            }
099
100            @Override
101            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
102                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
103            }
104
105            @Override
106            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
107                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
108            }
109
110            @Override
111            public void registerIndexListener(IndexListener indexListener) {
112                getDelegate().registerIndexListener(indexListener);
113                try {
114                    if (indexListener instanceof BaseDestination) {
115                        // update queue storeUsage
116                        Object matchingPersistenceAdapter = multiKahaDBPersistenceAdapter.destinationMap.chooseValue(getDelegate().getDestination());
117                        if (matchingPersistenceAdapter instanceof FilteredKahaDBPersistenceAdapter) {
118                            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) matchingPersistenceAdapter;
119                            if (filteredAdapter.getUsage() != null && filteredAdapter.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
120                                StoreUsage storeUsage = filteredAdapter.getUsage();
121                                storeUsage.setStore(filteredAdapter.getPersistenceAdapter());
122                                storeUsage.setParent(multiKahaDBPersistenceAdapter.getBrokerService().getSystemUsage().getStoreUsage());
123                                ((BaseDestination) indexListener).getSystemUsage().setStoreUsage(storeUsage);
124                            }
125                        }
126                    }
127                } catch (Exception ignored) {
128                    LOG.warn("Failed to set mKahaDB destination store usage", ignored);
129                }
130            }
131        };
132    }
133
134    public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) {
135        return new ProxyTopicMessageStore(messageStore) {
136            @Override
137            public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
138                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
139            }
140
141            @Override
142            public void addMessage(ConnectionContext context, final Message send) throws IOException {
143                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
144            }
145
146            @Override
147            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
148                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
149            }
150
151            @Override
152            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
153                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
154            }
155
156            @Override
157            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
158                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
159            }
160
161            @Override
162            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
163                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
164            }
165
166            @Override
167            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
168                                    MessageId messageId, MessageAck ack) throws IOException {
169                MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId,
170                        subscriptionName, messageId, ack);
171            }
172        };
173    }
174
175    public void deleteAllMessages() {
176        IOHelper.deleteChildren(getDirectory());
177    }
178
179    public int getJournalMaxFileLength() {
180        return journalMaxFileLength;
181    }
182
183    public void setJournalMaxFileLength(int journalMaxFileLength) {
184        this.journalMaxFileLength = journalMaxFileLength;
185    }
186
187    public int getJournalMaxWriteBatchSize() {
188        return journalWriteBatchSize;
189    }
190
191    public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) {
192        this.journalWriteBatchSize = journalWriteBatchSize;
193    }
194
195    public void setJournalCleanupInterval(long journalCleanupInterval) {
196        this.journalCleanupInterval = journalCleanupInterval;
197    }
198
199    public long getJournalCleanupInterval() {
200        return journalCleanupInterval;
201    }
202
203    public class Tx {
204        private final HashMap<TransactionStore, TransactionId> stores = new HashMap<TransactionStore, TransactionId>();
205        private int prepareLocationId = 0;
206
207        public void trackStore(TransactionStore store, XATransactionId xid) {
208            stores.put(store, xid);
209        }
210
211        public void trackStore(TransactionStore store) {
212            stores.put(store, null);
213        }
214
215        public HashMap<TransactionStore, TransactionId> getStoresMap() {
216            return stores;
217        }
218
219        public Set<TransactionStore> getStores() {
220            return stores.keySet();
221        }
222
223        public void trackPrepareLocation(Location location) {
224            this.prepareLocationId = location.getDataFileId();
225        }
226
227        public int getPreparedLocationId() {
228            return prepareLocationId;
229        }
230    }
231
232    public Tx getTx(TransactionId txid) {
233        Tx tx = inflightTransactions.get(txid);
234        if (tx == null) {
235            tx = new Tx();
236            inflightTransactions.put(txid, tx);
237        }
238        return tx;
239    }
240
241    public Tx removeTx(TransactionId txid) {
242        return inflightTransactions.remove(txid);
243    }
244
245    @Override
246    public void prepare(TransactionId txid) throws IOException {
247        Tx tx = getTx(txid);
248        for (TransactionStore store : tx.getStores()) {
249            store.prepare(txid);
250        }
251    }
252
253    @Override
254    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
255            throws IOException {
256
257        if (preCommit != null) {
258            preCommit.run();
259        }
260
261        Tx tx = getTx(txid);
262        if (wasPrepared) {
263            for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) {
264                TransactionId recovered = storeTx.getValue();
265                if (recovered != null) {
266                    storeTx.getKey().commit(recovered, true, null, null);
267                } else {
268                    storeTx.getKey().commit(txid, true, null, null);
269                }
270            }
271        } else {
272            // can only do 1pc on a single store
273            if (tx.getStores().size() == 1) {
274                for (TransactionStore store : tx.getStores()) {
275                    store.commit(txid, false, null, null);
276                }
277            } else {
278                // need to do local 2pc
279                for (TransactionStore store : tx.getStores()) {
280                    store.prepare(txid);
281                }
282                persistOutcome(tx, txid);
283                for (TransactionStore store : tx.getStores()) {
284                    store.commit(txid, true, null, null);
285                }
286                persistCompletion(txid);
287            }
288        }
289        removeTx(txid);
290        if (postCommit != null) {
291            postCommit.run();
292        }
293    }
294
295    public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
296        tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))));
297        pendingCommit.put(txid, tx);
298    }
299
300    public void persistCompletion(TransactionId txid) throws IOException {
301        store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
302        pendingCommit.remove(txid);
303    }
304
305    private Location store(JournalCommand<?> data) throws IOException {
306        int size = data.serializedSizeFramed();
307        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
308        os.writeByte(data.type().getNumber());
309        data.writeFramed(os);
310        Location location = journal.write(os.toByteSequence(), true);
311        journal.setLastAppendLocation(location);
312        return location;
313    }
314
315    @Override
316    public void rollback(TransactionId txid) throws IOException {
317        Tx tx = removeTx(txid);
318        if (tx != null) {
319            for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) {
320                TransactionId recovered = storeTx.getValue();
321                if (recovered != null) {
322                    storeTx.getKey().rollback(recovered);
323                } else {
324                    storeTx.getKey().rollback(txid);
325                }
326            }
327        }
328    }
329
330    @Override
331    public void start() throws Exception {
332        if (started.compareAndSet(false, true)) {
333            journal = new Journal() {
334                @Override
335                public void cleanup() {
336                    super.cleanup();
337                    txStoreCleanup();
338                }
339            };
340            journal.setDirectory(getDirectory());
341            journal.setMaxFileLength(journalMaxFileLength);
342            journal.setWriteBatchSize(journalWriteBatchSize);
343            journal.setCleanupInterval(journalCleanupInterval);
344            IOHelper.mkdirs(journal.getDirectory());
345            journal.start();
346            recoverPendingLocalTransactions();
347            recovered.set(true);
348            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
349        }
350    }
351
352    private void txStoreCleanup() {
353        if (!recovered.get()) {
354            return;
355        }
356        Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
357        for (Tx tx : inflightTransactions.values()) {
358            knownDataFileIds.remove(tx.getPreparedLocationId());
359        }
360        for (Tx tx : pendingCommit.values()) {
361            knownDataFileIds.remove(tx.getPreparedLocationId());
362        }
363        try {
364            journal.removeDataFiles(knownDataFileIds);
365        } catch (Exception e) {
366            LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds);
367        }
368    }
369
370    private File getDirectory() {
371        return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
372    }
373
374    @Override
375    public void stop() throws Exception {
376        if (started.compareAndSet(true, false) && journal != null) {
377            journal.close();
378            journal = null;
379        }
380    }
381
382    private void recoverPendingLocalTransactions() throws IOException {
383        Location location = journal.getNextLocation(null);
384        while (location != null) {
385            process(location, load(location));
386            location = journal.getNextLocation(location);
387        }
388        pendingCommit.putAll(inflightTransactions);
389        LOG.info("pending local transactions: " + pendingCommit.keySet());
390    }
391
392    public JournalCommand<?> load(Location location) throws IOException {
393        DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location));
394        byte readByte = is.readByte();
395        KahaEntryType type = KahaEntryType.valueOf(readByte);
396        if (type == null) {
397            throw new IOException("Could not load journal record. Invalid location: " + location);
398        }
399        JournalCommand<?> message = (JournalCommand<?>) type.createMessage();
400        message.mergeFramed(is);
401        return message;
402    }
403
404    public void process(final Location location, JournalCommand<?> command) throws IOException {
405        switch (command.type()) {
406            case KAHA_PREPARE_COMMAND:
407                KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
408                getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())).trackPrepareLocation(location);
409                break;
410            case KAHA_COMMIT_COMMAND:
411                KahaCommitCommand commitCommand = (KahaCommitCommand) command;
412                removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo()));
413                break;
414            case KAHA_TRACE_COMMAND:
415                break;
416            default:
417                throw new IOException("Unexpected command in transaction journal: " + command);
418        }
419    }
420
421
422    @Override
423    public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
424
425        for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
426            adapter.createTransactionStore().recover(new TransactionRecoveryListener() {
427                @Override
428                public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) {
429                    try {
430                        getTx(xid).trackStore(adapter.createTransactionStore(), xid);
431                    } catch (IOException e) {
432                        LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e);
433                    }
434                    listener.recover(xid, addedMessages, acks);
435                }
436            });
437        }
438
439        try {
440            Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
441            // force completion of local xa
442            for (TransactionId txid : broker.getPreparedTransactions(null)) {
443                if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
444                    try {
445                        if (pendingCommit.keySet().contains(txid)) {
446                            LOG.info("delivering pending commit outcome for tid: " + txid);
447                            broker.commitTransaction(null, txid, false);
448                        } else {
449                            LOG.info("delivering rollback outcome to store for tid: " + txid);
450                            broker.forgetTransaction(null, txid);
451                        }
452                        persistCompletion(txid);
453                    } catch (Exception ex) {
454                        LOG.error("failed to deliver pending outcome for tid: " + txid, ex);
455                    }
456                }
457            }
458        } catch (Exception e) {
459            LOG.error("failed to resolve pending local transactions", e);
460        }
461    }
462
463    void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
464            throws IOException {
465        if (message.getTransactionId() != null) {
466            getTx(message.getTransactionId()).trackStore(transactionStore);
467        }
468        destination.addMessage(context, message);
469    }
470
471    ListenableFuture<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
472            throws IOException {
473        if (message.getTransactionId() != null) {
474            getTx(message.getTransactionId()).trackStore(transactionStore);
475            destination.addMessage(context, message);
476            return AbstractMessageStore.FUTURE;
477        } else {
478            return destination.asyncAddQueueMessage(context, message);
479        }
480    }
481
482    ListenableFuture<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
483            throws IOException {
484
485        if (message.getTransactionId() != null) {
486            getTx(message.getTransactionId()).trackStore(transactionStore);
487            destination.addMessage(context, message);
488            return AbstractMessageStore.FUTURE;
489        } else {
490            return destination.asyncAddTopicMessage(context, message);
491        }
492    }
493
494    final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
495            throws IOException {
496        if (ack.getTransactionId() != null) {
497            getTx(ack.getTransactionId()).trackStore(transactionStore);
498        }
499        destination.removeMessage(context, ack);
500    }
501
502    final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
503            throws IOException {
504        if (ack.getTransactionId() != null) {
505            getTx(ack.getTransactionId()).trackStore(transactionStore);
506        }
507        destination.removeAsyncMessage(context, ack);
508    }
509
510    final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination,
511                           final String clientId, final String subscriptionName,
512                           final MessageId messageId, final MessageAck ack) throws IOException {
513        if (ack.getTransactionId() != null) {
514            getTx(ack.getTransactionId()).trackStore(transactionStore);
515        }
516        destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
517    }
518
519}