001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.Date;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.Map;
025import java.util.Set;
026import java.util.TreeSet;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.atomic.AtomicBoolean;
030
031import org.apache.activemq.broker.Broker;
032import org.apache.activemq.broker.ConnectionContext;
033import org.apache.activemq.broker.region.BaseDestination;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.MessageAck;
036import org.apache.activemq.command.MessageId;
037import org.apache.activemq.command.TransactionId;
038import org.apache.activemq.command.XATransactionId;
039import org.apache.activemq.store.AbstractMessageStore;
040import org.apache.activemq.store.IndexListener;
041import org.apache.activemq.store.ListenableFuture;
042import org.apache.activemq.store.MessageStore;
043import org.apache.activemq.store.PersistenceAdapter;
044import org.apache.activemq.store.ProxyMessageStore;
045import org.apache.activemq.store.ProxyTopicMessageStore;
046import org.apache.activemq.store.TopicMessageStore;
047import org.apache.activemq.store.TransactionRecoveryListener;
048import org.apache.activemq.store.TransactionStore;
049import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
050import org.apache.activemq.store.kahadb.data.KahaEntryType;
051import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
052import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
053import org.apache.activemq.store.kahadb.disk.journal.DataFile;
054import org.apache.activemq.store.kahadb.disk.journal.Journal;
055import org.apache.activemq.store.kahadb.disk.journal.Location;
056import org.apache.activemq.usage.StoreUsage;
057import org.apache.activemq.util.DataByteArrayInputStream;
058import org.apache.activemq.util.DataByteArrayOutputStream;
059import org.apache.activemq.util.IOHelper;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063public class MultiKahaDBTransactionStore implements TransactionStore {
064    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
065    final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
066    final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
067    final ConcurrentMap<TransactionId, Tx> pendingCommit = new ConcurrentHashMap<TransactionId, Tx>();
068    private Journal journal;
069    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
070    private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
071    private final AtomicBoolean started = new AtomicBoolean(false);
072    private final AtomicBoolean recovered = new AtomicBoolean(false);
073    private long journalCleanupInterval = Journal.DEFAULT_CLEANUP_INTERVAL;
074    private boolean checkForCorruption = true;
075    private AtomicBoolean corruptJournalDetected = new AtomicBoolean(false);
076
077    public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
078        this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
079    }
080
081    public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) {
082        return new ProxyMessageStore(messageStore) {
083            @Override
084            public void addMessage(ConnectionContext context, final Message send) throws IOException {
085                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
086            }
087
088            @Override
089            public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
090                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
091            }
092
093            @Override
094            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
095                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
096            }
097
098            @Override
099            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
100                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
101            }
102
103            @Override
104            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
105                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
106            }
107
108            @Override
109            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
110                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
111            }
112
113            @Override
114            public void registerIndexListener(IndexListener indexListener) {
115                getDelegate().registerIndexListener(indexListener);
116                try {
117                    if (indexListener instanceof BaseDestination) {
118                        // update queue storeUsage
119                        Object matchingPersistenceAdapter = multiKahaDBPersistenceAdapter.destinationMap.chooseValue(getDelegate().getDestination());
120                        if (matchingPersistenceAdapter instanceof FilteredKahaDBPersistenceAdapter) {
121                            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) matchingPersistenceAdapter;
122                            if (filteredAdapter.getUsage() != null && filteredAdapter.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
123                                StoreUsage storeUsage = filteredAdapter.getUsage();
124                                storeUsage.setStore(filteredAdapter.getPersistenceAdapter());
125                                storeUsage.setParent(multiKahaDBPersistenceAdapter.getBrokerService().getSystemUsage().getStoreUsage());
126                                ((BaseDestination) indexListener).getSystemUsage().setStoreUsage(storeUsage);
127                            }
128                        }
129                    }
130                } catch (Exception ignored) {
131                    LOG.warn("Failed to set mKahaDB destination store usage", ignored);
132                }
133            }
134        };
135    }
136
137    public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) {
138        return new ProxyTopicMessageStore(messageStore) {
139            @Override
140            public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
141                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
142            }
143
144            @Override
145            public void addMessage(ConnectionContext context, final Message send) throws IOException {
146                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
147            }
148
149            @Override
150            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
151                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
152            }
153
154            @Override
155            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
156                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
157            }
158
159            @Override
160            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
161                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
162            }
163
164            @Override
165            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
166                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
167            }
168
169            @Override
170            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
171                                    MessageId messageId, MessageAck ack) throws IOException {
172                MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId,
173                        subscriptionName, messageId, ack);
174            }
175        };
176    }
177
178    public void deleteAllMessages() {
179        IOHelper.deleteChildren(getDirectory());
180    }
181
182    public int getJournalMaxFileLength() {
183        return journalMaxFileLength;
184    }
185
186    public void setJournalMaxFileLength(int journalMaxFileLength) {
187        this.journalMaxFileLength = journalMaxFileLength;
188    }
189
190    public int getJournalMaxWriteBatchSize() {
191        return journalWriteBatchSize;
192    }
193
194    public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) {
195        this.journalWriteBatchSize = journalWriteBatchSize;
196    }
197
198    public void setJournalCleanupInterval(long journalCleanupInterval) {
199        this.journalCleanupInterval = journalCleanupInterval;
200    }
201
202    public long getJournalCleanupInterval() {
203        return journalCleanupInterval;
204    }
205
206    public void setCheckForCorruption(boolean checkForCorruption) {
207        this.checkForCorruption = checkForCorruption;
208    }
209
210    public boolean isCheckForCorruption() {
211        return checkForCorruption;
212    }
213
214    public class Tx {
215        private final HashMap<TransactionStore, TransactionId> stores = new HashMap<TransactionStore, TransactionId>();
216        private int prepareLocationId = 0;
217
218        public void trackStore(TransactionStore store, XATransactionId xid) {
219            stores.put(store, xid);
220        }
221
222        public void trackStore(TransactionStore store) {
223            stores.put(store, null);
224        }
225
226        public HashMap<TransactionStore, TransactionId> getStoresMap() {
227            return stores;
228        }
229
230        public Set<TransactionStore> getStores() {
231            return stores.keySet();
232        }
233
234        public void trackPrepareLocation(Location location) {
235            this.prepareLocationId = location.getDataFileId();
236        }
237
238        public int getPreparedLocationId() {
239            return prepareLocationId;
240        }
241    }
242
243    public Tx getTx(TransactionId txid) {
244        Tx tx = inflightTransactions.get(txid);
245        if (tx == null) {
246            tx = new Tx();
247            inflightTransactions.put(txid, tx);
248        }
249        return tx;
250    }
251
252    public Tx removeTx(TransactionId txid) {
253        return inflightTransactions.remove(txid);
254    }
255
256    @Override
257    public void prepare(TransactionId txid) throws IOException {
258        Tx tx = getTx(txid);
259        for (TransactionStore store : tx.getStores()) {
260            store.prepare(txid);
261        }
262    }
263
264    @Override
265    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
266            throws IOException {
267
268        if (preCommit != null) {
269            preCommit.run();
270        }
271
272        Tx tx = getTx(txid);
273        if (wasPrepared) {
274            for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) {
275                TransactionId recovered = storeTx.getValue();
276                if (recovered != null) {
277                    storeTx.getKey().commit(recovered, true, null, null);
278                } else {
279                    storeTx.getKey().commit(txid, true, null, null);
280                }
281            }
282        } else {
283            // can only do 1pc on a single store
284            if (tx.getStores().size() == 1) {
285                for (TransactionStore store : tx.getStores()) {
286                    store.commit(txid, false, null, null);
287                }
288            } else {
289                // need to do local 2pc
290                for (TransactionStore store : tx.getStores()) {
291                    store.prepare(txid);
292                }
293                persistOutcome(tx, txid);
294                for (TransactionStore store : tx.getStores()) {
295                    store.commit(txid, true, null, null);
296                }
297                persistCompletion(txid);
298            }
299        }
300        removeTx(txid);
301        if (postCommit != null) {
302            postCommit.run();
303        }
304    }
305
306    public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
307        tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))));
308        pendingCommit.put(txid, tx);
309    }
310
311    public void persistCompletion(TransactionId txid) throws IOException {
312        store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
313        pendingCommit.remove(txid);
314    }
315
316    private Location store(JournalCommand<?> data) throws IOException {
317        int size = data.serializedSizeFramed();
318        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
319        os.writeByte(data.type().getNumber());
320        data.writeFramed(os);
321        Location location = journal.write(os.toByteSequence(), true);
322        journal.setLastAppendLocation(location);
323        return location;
324    }
325
326    @Override
327    public void rollback(TransactionId txid) throws IOException {
328        Tx tx = removeTx(txid);
329        if (tx != null) {
330            for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) {
331                TransactionId recovered = storeTx.getValue();
332                if (recovered != null) {
333                    storeTx.getKey().rollback(recovered);
334                } else {
335                    storeTx.getKey().rollback(txid);
336                }
337            }
338        }
339    }
340
341    @Override
342    public void start() throws Exception {
343        if (started.compareAndSet(false, true)) {
344            journal = new Journal() {
345                @Override
346                public void cleanup() {
347                    super.cleanup();
348                    txStoreCleanup();
349                }
350            };
351            journal.setDirectory(getDirectory());
352            journal.setMaxFileLength(journalMaxFileLength);
353            journal.setWriteBatchSize(journalWriteBatchSize);
354            journal.setCleanupInterval(journalCleanupInterval);
355            journal.setCheckForCorruptionOnStartup(checkForCorruption);
356            journal.setChecksum(checkForCorruption);
357            IOHelper.mkdirs(journal.getDirectory());
358            journal.start();
359            recoverPendingLocalTransactions();
360            recovered.set(true);
361            loaded();
362        }
363    }
364
365    private void loaded() throws IOException {
366        store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
367    }
368
369    private void txStoreCleanup() {
370        if (!recovered.get() || corruptJournalDetected.get()) {
371            return;
372        }
373        Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
374        for (Tx tx : inflightTransactions.values()) {
375            knownDataFileIds.remove(tx.getPreparedLocationId());
376        }
377        for (Tx tx : pendingCommit.values()) {
378            knownDataFileIds.remove(tx.getPreparedLocationId());
379        }
380        try {
381            journal.removeDataFiles(knownDataFileIds);
382        } catch (Exception e) {
383            LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds);
384        }
385    }
386
387    private File getDirectory() {
388        return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
389    }
390
391    @Override
392    public void stop() throws Exception {
393        if (started.compareAndSet(true, false) && journal != null) {
394            journal.close();
395            journal = null;
396        }
397    }
398
399    private void recoverPendingLocalTransactions() throws IOException {
400
401        if (checkForCorruption) {
402            for (DataFile dataFile: journal.getFileMap().values()) {
403                if (!dataFile.getCorruptedBlocks().isEmpty()) {
404                    LOG.error("Corrupt Transaction journal records found in db-{}.log at {}", dataFile.getDataFileId(),  dataFile.getCorruptedBlocks());
405                    corruptJournalDetected.set(true);
406                }
407            }
408        }
409        if (!corruptJournalDetected.get()) {
410            Location location = null;
411            try {
412                location = journal.getNextLocation(null);
413                while (location != null) {
414                    process(location, load(location));
415                    location = journal.getNextLocation(location);
416                }
417            } catch (Exception oops) {
418                LOG.error("Corrupt journal record; unexpected exception on transaction journal replay of location:" + location, oops);
419                corruptJournalDetected.set(true);
420            }
421            pendingCommit.putAll(inflightTransactions);
422            LOG.info("pending local transactions: " + pendingCommit.keySet());
423        }
424    }
425
426    public JournalCommand<?> load(Location location) throws IOException {
427        DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location));
428        byte readByte = is.readByte();
429        KahaEntryType type = KahaEntryType.valueOf(readByte);
430        if (type == null) {
431            throw new IOException("Could not load journal record. Invalid location: " + location);
432        }
433        JournalCommand<?> message = (JournalCommand<?>) type.createMessage();
434        message.mergeFramed(is);
435        return message;
436    }
437
438    public void process(final Location location, JournalCommand<?> command) throws IOException {
439        switch (command.type()) {
440            case KAHA_PREPARE_COMMAND:
441                KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
442                getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())).trackPrepareLocation(location);
443                break;
444            case KAHA_COMMIT_COMMAND:
445                KahaCommitCommand commitCommand = (KahaCommitCommand) command;
446                removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo()));
447                break;
448            case KAHA_TRACE_COMMAND:
449                break;
450            default:
451                throw new IOException("Unexpected command in transaction journal: " + command);
452        }
453    }
454
455
456    @Override
457    public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
458
459        for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
460            adapter.createTransactionStore().recover(new TransactionRecoveryListener() {
461                @Override
462                public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) {
463                    try {
464                        getTx(xid).trackStore(adapter.createTransactionStore(), xid);
465                    } catch (IOException e) {
466                        LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e);
467                    }
468                    listener.recover(xid, addedMessages, acks);
469                }
470            });
471        }
472
473        boolean recoveryWorkPending = false;
474        try {
475            Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
476            // force completion of local xa
477            for (TransactionId txid : broker.getPreparedTransactions(null)) {
478                if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
479                    recoveryWorkPending = true;
480                    if (corruptJournalDetected.get()) {
481                        // not having a record is meaningless once our tx store is corrupt; we need a heuristic decision
482                        LOG.warn("Pending multi store local transaction {} requires manual heuristic outcome via JMX", txid);
483                        logSomeContext(txid);
484                    } else {
485                        try {
486                            if (pendingCommit.keySet().contains(txid)) {
487                                // we recorded the commit outcome, finish the job
488                                LOG.info("delivering pending commit outcome for tid: " + txid);
489                                broker.commitTransaction(null, txid, false);
490                            } else {
491                                // we have not record an outcome, and would have reported a commit failure, so we must rollback
492                                LOG.info("delivering rollback outcome to store for tid: " + txid);
493                                broker.forgetTransaction(null, txid);
494                            }
495                            persistCompletion(txid);
496                        } catch (Exception ex) {
497                            LOG.error("failed to deliver pending outcome for tid: " + txid, ex);
498                        }
499                    }
500                }
501            }
502        } catch (Exception e) {
503            LOG.error("failed to resolve pending local transactions", e);
504        }
505        // can we ignore corruption and resume
506        if (corruptJournalDetected.get() && !recoveryWorkPending) {
507            // move to new write file, gc will cleanup
508            journal.rotateWriteFile();
509            loaded();
510            corruptJournalDetected.set(false);
511            LOG.info("No heuristics outcome pending after corrupt tx store detection, auto resolving");
512        }
513    }
514
515    private void logSomeContext(TransactionId txid) throws IOException {
516        Tx tx = getTx(txid);
517        if (tx != null) {
518            for (TransactionStore store: tx.getStores()) {
519                for (PersistenceAdapter persistenceAdapter : multiKahaDBPersistenceAdapter.adapters) {
520                    if (persistenceAdapter.createTransactionStore() == store) {
521                        if (persistenceAdapter instanceof KahaDBPersistenceAdapter) {
522                            LOG.warn("Heuristic data in: " + persistenceAdapter + ", "  + ((KahaDBPersistenceAdapter)persistenceAdapter).getStore().getPreparedTransaction(txid));
523                        }
524                    }
525                }
526            }
527        }
528    }
529
530    void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
531            throws IOException {
532        if (message.getTransactionId() != null) {
533            getTx(message.getTransactionId()).trackStore(transactionStore);
534        }
535        destination.addMessage(context, message);
536    }
537
538    ListenableFuture<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
539            throws IOException {
540        if (message.getTransactionId() != null) {
541            getTx(message.getTransactionId()).trackStore(transactionStore);
542            destination.addMessage(context, message);
543            return AbstractMessageStore.FUTURE;
544        } else {
545            return destination.asyncAddQueueMessage(context, message);
546        }
547    }
548
549    ListenableFuture<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
550            throws IOException {
551
552        if (message.getTransactionId() != null) {
553            getTx(message.getTransactionId()).trackStore(transactionStore);
554            destination.addMessage(context, message);
555            return AbstractMessageStore.FUTURE;
556        } else {
557            return destination.asyncAddTopicMessage(context, message);
558        }
559    }
560
561    final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
562            throws IOException {
563        if (ack.getTransactionId() != null) {
564            getTx(ack.getTransactionId()).trackStore(transactionStore);
565        }
566        destination.removeMessage(context, ack);
567    }
568
569    final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
570            throws IOException {
571        if (ack.getTransactionId() != null) {
572            getTx(ack.getTransactionId()).trackStore(transactionStore);
573        }
574        destination.removeAsyncMessage(context, ack);
575    }
576
577    final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination,
578                           final String clientId, final String subscriptionName,
579                           final MessageId messageId, final MessageAck ack) throws IOException {
580        if (ack.getTransactionId() != null) {
581            getTx(ack.getTransactionId()).trackStore(transactionStore);
582        }
583        destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
584    }
585
586}