001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.Date;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.Map;
025import java.util.Set;
026import java.util.TreeSet;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.atomic.AtomicBoolean;
030
031import org.apache.activemq.broker.Broker;
032import org.apache.activemq.broker.ConnectionContext;
033import org.apache.activemq.broker.region.BaseDestination;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.MessageAck;
036import org.apache.activemq.command.MessageId;
037import org.apache.activemq.command.TransactionId;
038import org.apache.activemq.command.XATransactionId;
039import org.apache.activemq.store.AbstractMessageStore;
040import org.apache.activemq.store.IndexListener;
041import org.apache.activemq.store.ListenableFuture;
042import org.apache.activemq.store.MessageStore;
043import org.apache.activemq.store.PersistenceAdapter;
044import org.apache.activemq.store.ProxyMessageStore;
045import org.apache.activemq.store.ProxyTopicMessageStore;
046import org.apache.activemq.store.TopicMessageStore;
047import org.apache.activemq.store.TransactionRecoveryListener;
048import org.apache.activemq.store.TransactionStore;
049import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
050import org.apache.activemq.store.kahadb.data.KahaEntryType;
051import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
052import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
053import org.apache.activemq.store.kahadb.disk.journal.Journal;
054import org.apache.activemq.store.kahadb.disk.journal.Location;
055import org.apache.activemq.usage.StoreUsage;
056import org.apache.activemq.util.DataByteArrayInputStream;
057import org.apache.activemq.util.DataByteArrayOutputStream;
058import org.apache.activemq.util.IOHelper;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062public class MultiKahaDBTransactionStore implements TransactionStore {
063    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
064    final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
065    final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
066    final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
067    private Journal journal;
068    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
069    private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
070    private final AtomicBoolean started = new AtomicBoolean(false);
071
072    public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
073        this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
074    }
075
076    public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) {
077        return new ProxyMessageStore(messageStore) {
078            @Override
079            public void addMessage(ConnectionContext context, final Message send) throws IOException {
080                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
081            }
082
083            @Override
084            public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
085                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
086            }
087
088            @Override
089            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
090                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
091            }
092
093            @Override
094            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
095                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
096            }
097
098            @Override
099            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
100                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
101            }
102
103            @Override
104            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
105                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
106            }
107
108            @Override
109            public void registerIndexListener(IndexListener indexListener) {
110                getDelegate().registerIndexListener(indexListener);
111                try {
112                    if (indexListener instanceof BaseDestination) {
113                        // update queue storeUsage
114                        Object matchingPersistenceAdapter = multiKahaDBPersistenceAdapter.destinationMap.chooseValue(getDelegate().getDestination());
115                        if (matchingPersistenceAdapter instanceof FilteredKahaDBPersistenceAdapter) {
116                            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) matchingPersistenceAdapter;
117                            if (filteredAdapter.getUsage() != null && filteredAdapter.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
118                                StoreUsage storeUsage = filteredAdapter.getUsage();
119                                storeUsage.setStore(filteredAdapter.getPersistenceAdapter());
120                                storeUsage.setParent(multiKahaDBPersistenceAdapter.getBrokerService().getSystemUsage().getStoreUsage());
121                                ((BaseDestination) indexListener).getSystemUsage().setStoreUsage(storeUsage);
122                            }
123                        }
124                    }
125                } catch (Exception ignored) {
126                    LOG.warn("Failed to set mKahaDB destination store usage", ignored);
127                }
128            }
129        };
130    }
131
132    public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) {
133        return new ProxyTopicMessageStore(messageStore) {
134            @Override
135            public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
136                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
137            }
138
139            @Override
140            public void addMessage(ConnectionContext context, final Message send) throws IOException {
141                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
142            }
143
144            @Override
145            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
146                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
147            }
148
149            @Override
150            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
151                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
152            }
153
154            @Override
155            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
156                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
157            }
158
159            @Override
160            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
161                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
162            }
163
164            @Override
165            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
166                                    MessageId messageId, MessageAck ack) throws IOException {
167                MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId,
168                        subscriptionName, messageId, ack);
169            }
170        };
171    }
172
173    public void deleteAllMessages() {
174        IOHelper.deleteChildren(getDirectory());
175    }
176
177    public int getJournalMaxFileLength() {
178        return journalMaxFileLength;
179    }
180
181    public void setJournalMaxFileLength(int journalMaxFileLength) {
182        this.journalMaxFileLength = journalMaxFileLength;
183    }
184
185    public int getJournalMaxWriteBatchSize() {
186        return journalWriteBatchSize;
187    }
188
189    public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) {
190        this.journalWriteBatchSize = journalWriteBatchSize;
191    }
192
193    public class Tx {
194        private final HashMap<TransactionStore, TransactionId> stores = new HashMap<TransactionStore, TransactionId>();
195        private int prepareLocationId = 0;
196
197        public void trackStore(TransactionStore store, XATransactionId xid) {
198            stores.put(store, xid);
199        }
200
201        public void trackStore(TransactionStore store) {
202            stores.put(store, null);
203        }
204
205        public HashMap<TransactionStore, TransactionId> getStoresMap() {
206            return stores;
207        }
208
209        public Set<TransactionStore> getStores() {
210            return stores.keySet();
211        }
212
213        public void trackPrepareLocation(Location location) {
214            this.prepareLocationId = location.getDataFileId();
215        }
216
217        public int getPreparedLocationId() {
218            return prepareLocationId;
219        }
220    }
221
222    public Tx getTx(TransactionId txid) {
223        Tx tx = inflightTransactions.get(txid);
224        if (tx == null) {
225            tx = new Tx();
226            inflightTransactions.put(txid, tx);
227        }
228        return tx;
229    }
230
231    public Tx removeTx(TransactionId txid) {
232        return inflightTransactions.remove(txid);
233    }
234
235    @Override
236    public void prepare(TransactionId txid) throws IOException {
237        Tx tx = getTx(txid);
238        for (TransactionStore store : tx.getStores()) {
239            store.prepare(txid);
240        }
241    }
242
243    @Override
244    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
245            throws IOException {
246
247        if (preCommit != null) {
248            preCommit.run();
249        }
250
251        Tx tx = getTx(txid);
252        if (wasPrepared) {
253            for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) {
254                TransactionId recovered = storeTx.getValue();
255                if (recovered != null) {
256                    storeTx.getKey().commit(recovered, true, null, null);
257                } else {
258                    storeTx.getKey().commit(txid, true, null, null);
259                }
260            }
261        } else {
262            // can only do 1pc on a single store
263            if (tx.getStores().size() == 1) {
264                for (TransactionStore store : tx.getStores()) {
265                    store.commit(txid, false, null, null);
266                }
267            } else {
268                // need to do local 2pc
269                for (TransactionStore store : tx.getStores()) {
270                    store.prepare(txid);
271                }
272                persistOutcome(tx, txid);
273                for (TransactionStore store : tx.getStores()) {
274                    store.commit(txid, true, null, null);
275                }
276                persistCompletion(txid);
277            }
278        }
279        removeTx(txid);
280        if (postCommit != null) {
281            postCommit.run();
282        }
283    }
284
285    public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
286        tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))));
287    }
288
289    public void persistCompletion(TransactionId txid) throws IOException {
290        store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
291    }
292
293    private Location store(JournalCommand<?> data) throws IOException {
294        int size = data.serializedSizeFramed();
295        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
296        os.writeByte(data.type().getNumber());
297        data.writeFramed(os);
298        Location location = journal.write(os.toByteSequence(), true);
299        journal.setLastAppendLocation(location);
300        return location;
301    }
302
303    @Override
304    public void rollback(TransactionId txid) throws IOException {
305        Tx tx = removeTx(txid);
306        if (tx != null) {
307            for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) {
308                TransactionId recovered = storeTx.getValue();
309                if (recovered != null) {
310                    storeTx.getKey().rollback(recovered);
311                } else {
312                    storeTx.getKey().rollback(txid);
313                }
314            }
315        }
316    }
317
318    @Override
319    public void start() throws Exception {
320        if (started.compareAndSet(false, true)) {
321            journal = new Journal() {
322                @Override
323                public void cleanup() {
324                    super.cleanup();
325                    txStoreCleanup();
326                }
327            };
328            journal.setDirectory(getDirectory());
329            journal.setMaxFileLength(journalMaxFileLength);
330            journal.setWriteBatchSize(journalWriteBatchSize);
331            IOHelper.mkdirs(journal.getDirectory());
332            journal.start();
333            recoverPendingLocalTransactions();
334            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
335        }
336    }
337
338    private void txStoreCleanup() {
339        Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
340        for (Tx tx : inflightTransactions.values()) {
341            knownDataFileIds.remove(tx.getPreparedLocationId());
342        }
343        try {
344            journal.removeDataFiles(knownDataFileIds);
345        } catch (Exception e) {
346            LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds);
347        }
348    }
349
350    private File getDirectory() {
351        return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
352    }
353
354    @Override
355    public void stop() throws Exception {
356        if (started.compareAndSet(true, false) && journal != null) {
357            journal.close();
358            journal = null;
359        }
360    }
361
362    private void recoverPendingLocalTransactions() throws IOException {
363        Location location = journal.getNextLocation(null);
364        while (location != null) {
365            process(load(location));
366            location = journal.getNextLocation(location);
367        }
368        recoveredPendingCommit.addAll(inflightTransactions.keySet());
369        LOG.info("pending local transactions: " + recoveredPendingCommit);
370    }
371
372    public JournalCommand<?> load(Location location) throws IOException {
373        DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location));
374        byte readByte = is.readByte();
375        KahaEntryType type = KahaEntryType.valueOf(readByte);
376        if (type == null) {
377            throw new IOException("Could not load journal record. Invalid location: " + location);
378        }
379        JournalCommand<?> message = (JournalCommand<?>) type.createMessage();
380        message.mergeFramed(is);
381        return message;
382    }
383
384    public void process(JournalCommand<?> command) throws IOException {
385        switch (command.type()) {
386            case KAHA_PREPARE_COMMAND:
387                KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
388                getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo()));
389                break;
390            case KAHA_COMMIT_COMMAND:
391                KahaCommitCommand commitCommand = (KahaCommitCommand) command;
392                removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo()));
393                break;
394            case KAHA_TRACE_COMMAND:
395                break;
396            default:
397                throw new IOException("Unexpected command in transaction journal: " + command);
398        }
399    }
400
401
402    @Override
403    public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
404
405        for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
406            adapter.createTransactionStore().recover(new TransactionRecoveryListener() {
407                @Override
408                public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) {
409                    try {
410                        getTx(xid).trackStore(adapter.createTransactionStore(), xid);
411                    } catch (IOException e) {
412                        LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e);
413                    }
414                    listener.recover(xid, addedMessages, acks);
415                }
416            });
417        }
418
419        try {
420            Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
421            // force completion of local xa
422            for (TransactionId txid : broker.getPreparedTransactions(null)) {
423                if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
424                    try {
425                        if (recoveredPendingCommit.contains(txid)) {
426                            LOG.info("delivering pending commit outcome for tid: " + txid);
427                            broker.commitTransaction(null, txid, false);
428
429                        } else {
430                            LOG.info("delivering rollback outcome to store for tid: " + txid);
431                            broker.forgetTransaction(null, txid);
432                        }
433                        persistCompletion(txid);
434                    } catch (Exception ex) {
435                        LOG.error("failed to deliver pending outcome for tid: " + txid, ex);
436                    }
437                }
438            }
439        } catch (Exception e) {
440            LOG.error("failed to resolve pending local transactions", e);
441        }
442    }
443
444    void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
445            throws IOException {
446        if (message.getTransactionId() != null) {
447            getTx(message.getTransactionId()).trackStore(transactionStore);
448        }
449        destination.addMessage(context, message);
450    }
451
452    ListenableFuture<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
453            throws IOException {
454        if (message.getTransactionId() != null) {
455            getTx(message.getTransactionId()).trackStore(transactionStore);
456            destination.addMessage(context, message);
457            return AbstractMessageStore.FUTURE;
458        } else {
459            return destination.asyncAddQueueMessage(context, message);
460        }
461    }
462
463    ListenableFuture<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
464            throws IOException {
465
466        if (message.getTransactionId() != null) {
467            getTx(message.getTransactionId()).trackStore(transactionStore);
468            destination.addMessage(context, message);
469            return AbstractMessageStore.FUTURE;
470        } else {
471            return destination.asyncAddTopicMessage(context, message);
472        }
473    }
474
475    final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
476            throws IOException {
477        if (ack.getTransactionId() != null) {
478            getTx(ack.getTransactionId()).trackStore(transactionStore);
479        }
480        destination.removeMessage(context, ack);
481    }
482
483    final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
484            throws IOException {
485        if (ack.getTransactionId() != null) {
486            getTx(ack.getTransactionId()).trackStore(transactionStore);
487        }
488        destination.removeAsyncMessage(context, ack);
489    }
490
491    final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination,
492                           final String clientId, final String subscriptionName,
493                           final MessageId messageId, final MessageAck ack) throws IOException {
494        if (ack.getTransactionId() != null) {
495            getTx(ack.getTransactionId()).trackStore(transactionStore);
496        }
497        destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
498    }
499
500}