001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Set;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.FutureTask;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.Semaphore;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.TimeoutException;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicInteger;
042import java.util.concurrent.atomic.AtomicReference;
043
044import org.apache.activemq.broker.ConnectionContext;
045import org.apache.activemq.broker.region.BaseDestination;
046import org.apache.activemq.broker.scheduler.JobSchedulerStore;
047import org.apache.activemq.command.ActiveMQDestination;
048import org.apache.activemq.command.ActiveMQQueue;
049import org.apache.activemq.command.ActiveMQTempQueue;
050import org.apache.activemq.command.ActiveMQTempTopic;
051import org.apache.activemq.command.ActiveMQTopic;
052import org.apache.activemq.command.Message;
053import org.apache.activemq.command.MessageAck;
054import org.apache.activemq.command.MessageId;
055import org.apache.activemq.command.ProducerId;
056import org.apache.activemq.command.SubscriptionInfo;
057import org.apache.activemq.command.TransactionId;
058import org.apache.activemq.openwire.OpenWireFormat;
059import org.apache.activemq.protobuf.Buffer;
060import org.apache.activemq.store.AbstractMessageStore;
061import org.apache.activemq.store.IndexListener;
062import org.apache.activemq.store.ListenableFuture;
063import org.apache.activemq.store.MessageRecoveryListener;
064import org.apache.activemq.store.MessageStore;
065import org.apache.activemq.store.PersistenceAdapter;
066import org.apache.activemq.store.TopicMessageStore;
067import org.apache.activemq.store.TransactionIdTransformer;
068import org.apache.activemq.store.TransactionStore;
069import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
070import org.apache.activemq.store.kahadb.data.KahaDestination;
071import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
072import org.apache.activemq.store.kahadb.data.KahaLocation;
073import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
074import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
075import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
076import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
077import org.apache.activemq.store.kahadb.disk.journal.Location;
078import org.apache.activemq.store.kahadb.disk.page.Transaction;
079import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
080import org.apache.activemq.usage.MemoryUsage;
081import org.apache.activemq.usage.SystemUsage;
082import org.apache.activemq.util.ServiceStopper;
083import org.apache.activemq.util.ThreadPoolUtils;
084import org.apache.activemq.wireformat.WireFormat;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087
088public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
089    static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
090    private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH;
091
092    public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
093    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
094            PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
095    public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
096    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
097            PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
098
099    protected ExecutorService queueExecutor;
100    protected ExecutorService topicExecutor;
101    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
102    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
103    final WireFormat wireFormat = new OpenWireFormat();
104    private SystemUsage usageManager;
105    private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
106    private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
107    Semaphore globalQueueSemaphore;
108    Semaphore globalTopicSemaphore;
109    private boolean concurrentStoreAndDispatchQueues = true;
110    // when true, message order may be compromised when cache is exhausted if store is out
111    // or order w.r.t cache
112    private boolean concurrentStoreAndDispatchTopics = false;
113    private final boolean concurrentStoreAndDispatchTransactions = false;
114    private int maxAsyncJobs = MAX_ASYNC_JOBS;
115    private final KahaDBTransactionStore transactionStore;
116    private TransactionIdTransformer transactionIdTransformer;
117
118    public KahaDBStore() {
119        this.transactionStore = new KahaDBTransactionStore(this);
120        this.transactionIdTransformer = new TransactionIdTransformer() {
121            @Override
122            public TransactionId transform(TransactionId txid) {
123                return txid;
124            }
125        };
126    }
127
128    @Override
129    public String toString() {
130        return "KahaDB:[" + directory.getAbsolutePath() + "]";
131    }
132
133    @Override
134    public void setBrokerName(String brokerName) {
135    }
136
137    @Override
138    public void setUsageManager(SystemUsage usageManager) {
139        this.usageManager = usageManager;
140    }
141
142    public SystemUsage getUsageManager() {
143        return this.usageManager;
144    }
145
146    /**
147     * @return the concurrentStoreAndDispatch
148     */
149    public boolean isConcurrentStoreAndDispatchQueues() {
150        return this.concurrentStoreAndDispatchQueues;
151    }
152
153    /**
154     * @param concurrentStoreAndDispatch
155     *            the concurrentStoreAndDispatch to set
156     */
157    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
158        this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
159    }
160
161    /**
162     * @return the concurrentStoreAndDispatch
163     */
164    public boolean isConcurrentStoreAndDispatchTopics() {
165        return this.concurrentStoreAndDispatchTopics;
166    }
167
168    /**
169     * @param concurrentStoreAndDispatch
170     *            the concurrentStoreAndDispatch to set
171     */
172    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
173        this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
174    }
175
176    public boolean isConcurrentStoreAndDispatchTransactions() {
177        return this.concurrentStoreAndDispatchTransactions;
178    }
179
180    /**
181     * @return the maxAsyncJobs
182     */
183    public int getMaxAsyncJobs() {
184        return this.maxAsyncJobs;
185    }
186
187    /**
188     * @param maxAsyncJobs
189     *            the maxAsyncJobs to set
190     */
191    public void setMaxAsyncJobs(int maxAsyncJobs) {
192        this.maxAsyncJobs = maxAsyncJobs;
193    }
194
195    @Override
196    public void doStart() throws Exception {
197        if (brokerService != null) {
198            metadata.openwireVersion = brokerService.getStoreOpenWireVersion();
199            wireFormat.setVersion(metadata.openwireVersion);
200
201            if (LOG.isDebugEnabled()) {
202                LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion);
203            }
204
205        }
206        super.doStart();
207
208        if (brokerService != null) {
209            // In case the recovered store used a different OpenWire version log a warning
210            // to assist in determining why journal reads fail.
211            if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) {
212                LOG.warn("Recovered Store uses a different OpenWire version[{}] " +
213                         "than the version configured[{}].",
214                         metadata.openwireVersion, brokerService.getStoreOpenWireVersion());
215            }
216        }
217
218        this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
219        this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
220        this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
221        this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
222        this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
223            asyncQueueJobQueue, new ThreadFactory() {
224                @Override
225                public Thread newThread(Runnable runnable) {
226                    Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
227                    thread.setDaemon(true);
228                    return thread;
229                }
230            });
231        this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
232            asyncTopicJobQueue, new ThreadFactory() {
233                @Override
234                public Thread newThread(Runnable runnable) {
235                    Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
236                    thread.setDaemon(true);
237                    return thread;
238                }
239            });
240    }
241
242    @Override
243    public void doStop(ServiceStopper stopper) throws Exception {
244        // drain down async jobs
245        LOG.info("Stopping async queue tasks");
246        if (this.globalQueueSemaphore != null) {
247            this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
248        }
249        synchronized (this.asyncQueueMaps) {
250            for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
251                synchronized (m) {
252                    for (StoreTask task : m.values()) {
253                        task.cancel();
254                    }
255                }
256            }
257            this.asyncQueueMaps.clear();
258        }
259        LOG.info("Stopping async topic tasks");
260        if (this.globalTopicSemaphore != null) {
261            this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
262        }
263        synchronized (this.asyncTopicMaps) {
264            for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
265                synchronized (m) {
266                    for (StoreTask task : m.values()) {
267                        task.cancel();
268                    }
269                }
270            }
271            this.asyncTopicMaps.clear();
272        }
273        if (this.globalQueueSemaphore != null) {
274            this.globalQueueSemaphore.drainPermits();
275        }
276        if (this.globalTopicSemaphore != null) {
277            this.globalTopicSemaphore.drainPermits();
278        }
279        if (this.queueExecutor != null) {
280            ThreadPoolUtils.shutdownNow(queueExecutor);
281            queueExecutor = null;
282        }
283        if (this.topicExecutor != null) {
284            ThreadPoolUtils.shutdownNow(topicExecutor);
285            topicExecutor = null;
286        }
287        LOG.info("Stopped KahaDB");
288        super.doStop(stopper);
289    }
290
291    private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
292        return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
293            @Override
294            public Location execute(Transaction tx) throws IOException {
295                StoredDestination sd = getStoredDestination(destination, tx);
296                Long sequence = sd.messageIdIndex.get(tx, key);
297                if (sequence == null) {
298                    return null;
299                }
300                return sd.orderIndex.get(tx, sequence).location;
301            }
302        });
303    }
304
305    protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
306        StoreQueueTask task = null;
307        synchronized (store.asyncTaskMap) {
308            task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
309        }
310        return task;
311    }
312
313    // with asyncTaskMap locked
314    protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
315        store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
316        this.queueExecutor.execute(task);
317    }
318
319    protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
320        StoreTopicTask task = null;
321        synchronized (store.asyncTaskMap) {
322            task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
323        }
324        return task;
325    }
326
327    protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
328        synchronized (store.asyncTaskMap) {
329            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
330        }
331        this.topicExecutor.execute(task);
332    }
333
334    @Override
335    public TransactionStore createTransactionStore() throws IOException {
336        return this.transactionStore;
337    }
338
339    public boolean getForceRecoverIndex() {
340        return this.forceRecoverIndex;
341    }
342
343    public void setForceRecoverIndex(boolean forceRecoverIndex) {
344        this.forceRecoverIndex = forceRecoverIndex;
345    }
346
347    public class KahaDBMessageStore extends AbstractMessageStore {
348        protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
349        protected KahaDestination dest;
350        private final int maxAsyncJobs;
351        private final Semaphore localDestinationSemaphore;
352
353        double doneTasks, canceledTasks = 0;
354
355        public KahaDBMessageStore(ActiveMQDestination destination) {
356            super(destination);
357            this.dest = convert(destination);
358            this.maxAsyncJobs = getMaxAsyncJobs();
359            this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
360        }
361
362        @Override
363        public ActiveMQDestination getDestination() {
364            return destination;
365        }
366
367        @Override
368        public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
369                throws IOException {
370            if (isConcurrentStoreAndDispatchQueues()) {
371                message.beforeMarshall(wireFormat);
372                StoreQueueTask result = new StoreQueueTask(this, context, message);
373                ListenableFuture<Object> future = result.getFuture();
374                message.getMessageId().setFutureOrSequenceLong(future);
375                message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch
376                result.aquireLocks();
377                synchronized (asyncTaskMap) {
378                    addQueueTask(this, result);
379                    if (indexListener != null) {
380                        indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
381                    }
382                }
383                return future;
384            } else {
385                return super.asyncAddQueueMessage(context, message);
386            }
387        }
388
389        @Override
390        public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
391            if (isConcurrentStoreAndDispatchQueues()) {
392                AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
393                StoreQueueTask task = null;
394                synchronized (asyncTaskMap) {
395                    task = (StoreQueueTask) asyncTaskMap.get(key);
396                }
397                if (task != null) {
398                    if (ack.isInTransaction() || !task.cancel()) {
399                        try {
400                            task.future.get();
401                        } catch (InterruptedException e) {
402                            throw new InterruptedIOException(e.toString());
403                        } catch (Exception ignored) {
404                            LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
405                        }
406                        removeMessage(context, ack);
407                    } else {
408                        indexLock.writeLock().lock();
409                        try {
410                            metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId());
411                        } finally {
412                            indexLock.writeLock().unlock();
413                        }
414                        synchronized (asyncTaskMap) {
415                            asyncTaskMap.remove(key);
416                        }
417                    }
418                } else {
419                    removeMessage(context, ack);
420                }
421            } else {
422                removeMessage(context, ack);
423            }
424        }
425
426        @Override
427        public void addMessage(final ConnectionContext context, final Message message) throws IOException {
428            final KahaAddMessageCommand command = new KahaAddMessageCommand();
429            command.setDestination(dest);
430            command.setMessageId(message.getMessageId().toProducerKey());
431            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
432            command.setPriority(message.getPriority());
433            command.setPrioritySupported(isPrioritizedMessages());
434            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
435            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
436            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() {
437                // sync add? (for async, future present from getFutureOrSequenceLong)
438                Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
439
440                @Override
441                public void sequenceAssignedWithIndexLocked(final long sequence) {
442                    message.getMessageId().setFutureOrSequenceLong(sequence);
443                    if (indexListener != null) {
444                        if (possibleFuture == null) {
445                            trackPendingAdd(dest, sequence);
446                            indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
447                                @Override
448                                public void run() {
449                                    trackPendingAddComplete(dest, sequence);
450                                }
451                            }));
452                        }
453                    }
454                }
455            }, null);
456        }
457
458        @Override
459        public void updateMessage(Message message) throws IOException {
460            if (LOG.isTraceEnabled()) {
461                LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
462            }
463            KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
464            KahaAddMessageCommand command = new KahaAddMessageCommand();
465            command.setDestination(dest);
466            command.setMessageId(message.getMessageId().toProducerKey());
467            command.setPriority(message.getPriority());
468            command.setPrioritySupported(prioritizedMessages);
469            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
470            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
471            updateMessageCommand.setMessage(command);
472            store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null);
473        }
474
475        @Override
476        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
477            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
478            command.setDestination(dest);
479            command.setMessageId(ack.getLastMessageId().toProducerKey());
480            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())));
481
482            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
483            command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
484            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
485        }
486
487        @Override
488        public void removeAllMessages(ConnectionContext context) throws IOException {
489            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
490            command.setDestination(dest);
491            store(command, true, null, null);
492        }
493
494        @Override
495        public Message getMessage(MessageId identity) throws IOException {
496            final String key = identity.toProducerKey();
497
498            // Hopefully one day the page file supports concurrent read
499            // operations... but for now we must
500            // externally synchronize...
501            Location location;
502            indexLock.writeLock().lock();
503            try {
504                location = findMessageLocation(key, dest);
505            } finally {
506                indexLock.writeLock().unlock();
507            }
508            if (location == null) {
509                return null;
510            }
511
512            return loadMessage(location);
513        }
514
515        @Override
516        public int getMessageCount() throws IOException {
517            try {
518                lockAsyncJobQueue();
519                indexLock.writeLock().lock();
520                try {
521                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
522                        @Override
523                        public Integer execute(Transaction tx) throws IOException {
524                            // Iterate through all index entries to get a count
525                            // of messages in the destination.
526                            StoredDestination sd = getStoredDestination(dest, tx);
527                            int rc = 0;
528                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
529                                iterator.next();
530                                rc++;
531                            }
532                            return rc;
533                        }
534                    });
535                } finally {
536                    indexLock.writeLock().unlock();
537                }
538            } finally {
539                unlockAsyncJobQueue();
540            }
541        }
542
543        @Override
544        public boolean isEmpty() throws IOException {
545            indexLock.writeLock().lock();
546            try {
547                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
548                    @Override
549                    public Boolean execute(Transaction tx) throws IOException {
550                        // Iterate through all index entries to get a count of
551                        // messages in the destination.
552                        StoredDestination sd = getStoredDestination(dest, tx);
553                        return sd.locationIndex.isEmpty(tx);
554                    }
555                });
556            } finally {
557                indexLock.writeLock().unlock();
558            }
559        }
560
561        @Override
562        public void recover(final MessageRecoveryListener listener) throws Exception {
563            // recovery may involve expiry which will modify
564            indexLock.writeLock().lock();
565            try {
566                pageFile.tx().execute(new Transaction.Closure<Exception>() {
567                    @Override
568                    public void execute(Transaction tx) throws Exception {
569                        StoredDestination sd = getStoredDestination(dest, tx);
570                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
571                        sd.orderIndex.resetCursorPosition();
572                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
573                                .hasNext(); ) {
574                            Entry<Long, MessageKeys> entry = iterator.next();
575                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
576                                continue;
577                            }
578                            Message msg = loadMessage(entry.getValue().location);
579                            listener.recoverMessage(msg);
580                        }
581                    }
582                });
583            } finally {
584                indexLock.writeLock().unlock();
585            }
586        }
587
588        @Override
589        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
590            indexLock.writeLock().lock();
591            try {
592                pageFile.tx().execute(new Transaction.Closure<Exception>() {
593                    @Override
594                    public void execute(Transaction tx) throws Exception {
595                        StoredDestination sd = getStoredDestination(dest, tx);
596                        Entry<Long, MessageKeys> entry = null;
597                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
598                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
599                            entry = iterator.next();
600                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
601                                continue;
602                            }
603                            Message msg = loadMessage(entry.getValue().location);
604                            msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
605                            listener.recoverMessage(msg);
606                            counter++;
607                            if (counter >= maxReturned) {
608                                break;
609                            }
610                        }
611                        sd.orderIndex.stoppedIterating();
612                    }
613                });
614            } finally {
615                indexLock.writeLock().unlock();
616            }
617        }
618
619        protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
620            int counter = 0;
621            String id;
622            for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) {
623                id = iterator.next();
624                iterator.remove();
625                Long sequence = sd.messageIdIndex.get(tx, id);
626                if (sequence != null) {
627                    if (sd.orderIndex.alreadyDispatched(sequence)) {
628                        listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location));
629                        counter++;
630                        if (counter >= maxReturned) {
631                            break;
632                        }
633                    } else {
634                        LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
635                    }
636                } else {
637                    LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd);
638                }
639            }
640            return counter;
641        }
642
643
644        @Override
645        public void resetBatching() {
646            if (pageFile.isLoaded()) {
647                indexLock.writeLock().lock();
648                try {
649                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
650                        @Override
651                        public void execute(Transaction tx) throws Exception {
652                            StoredDestination sd = getExistingStoredDestination(dest, tx);
653                            if (sd != null) {
654                                sd.orderIndex.resetCursorPosition();}
655                            }
656                        });
657                } catch (Exception e) {
658                    LOG.error("Failed to reset batching",e);
659                } finally {
660                    indexLock.writeLock().unlock();
661                }
662            }
663        }
664
665        @Override
666        public void setBatch(final MessageId identity) throws IOException {
667            indexLock.writeLock().lock();
668            try {
669                pageFile.tx().execute(new Transaction.Closure<IOException>() {
670                    @Override
671                    public void execute(Transaction tx) throws IOException {
672                        StoredDestination sd = getStoredDestination(dest, tx);
673                        Long location = (Long) identity.getFutureOrSequenceLong();
674                        Long pending = sd.orderIndex.minPendingAdd();
675                        if (pending != null) {
676                            location = Math.min(location, pending-1);
677                        }
678                        sd.orderIndex.setBatch(tx, location);
679                    }
680                });
681            } finally {
682                indexLock.writeLock().unlock();
683            }
684        }
685
686        @Override
687        public void setMemoryUsage(MemoryUsage memoryUsage) {
688        }
689        @Override
690        public void start() throws Exception {
691            super.start();
692        }
693        @Override
694        public void stop() throws Exception {
695            super.stop();
696        }
697
698        protected void lockAsyncJobQueue() {
699            try {
700                if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) {
701                    throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore);
702                }
703            } catch (Exception e) {
704                LOG.error("Failed to lock async jobs for " + this.destination, e);
705            }
706        }
707
708        protected void unlockAsyncJobQueue() {
709            this.localDestinationSemaphore.release(this.maxAsyncJobs);
710        }
711
712        protected void acquireLocalAsyncLock() {
713            try {
714                this.localDestinationSemaphore.acquire();
715            } catch (InterruptedException e) {
716                LOG.error("Failed to aquire async lock for " + this.destination, e);
717            }
718        }
719
720        protected void releaseLocalAsyncLock() {
721            this.localDestinationSemaphore.release();
722        }
723
724        @Override
725        public String toString(){
726            return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
727        }
728    }
729
730    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
731        private final AtomicInteger subscriptionCount = new AtomicInteger();
732        public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
733            super(destination);
734            this.subscriptionCount.set(getAllSubscriptions().length);
735            if (isConcurrentStoreAndDispatchTopics()) {
736                asyncTopicMaps.add(asyncTaskMap);
737            }
738        }
739
740        @Override
741        public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
742                throws IOException {
743            if (isConcurrentStoreAndDispatchTopics()) {
744                message.beforeMarshall(wireFormat);
745                StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
746                result.aquireLocks();
747                addTopicTask(this, result);
748                return result.getFuture();
749            } else {
750                return super.asyncAddTopicMessage(context, message);
751            }
752        }
753
754        @Override
755        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
756                                MessageId messageId, MessageAck ack) throws IOException {
757            String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
758            if (isConcurrentStoreAndDispatchTopics()) {
759                AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
760                StoreTopicTask task = null;
761                synchronized (asyncTaskMap) {
762                    task = (StoreTopicTask) asyncTaskMap.get(key);
763                }
764                if (task != null) {
765                    if (task.addSubscriptionKey(subscriptionKey)) {
766                        removeTopicTask(this, messageId);
767                        if (task.cancel()) {
768                            synchronized (asyncTaskMap) {
769                                asyncTaskMap.remove(key);
770                            }
771                        }
772                    }
773                } else {
774                    doAcknowledge(context, subscriptionKey, messageId, ack);
775                }
776            } else {
777                doAcknowledge(context, subscriptionKey, messageId, ack);
778            }
779        }
780
781        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
782                throws IOException {
783            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
784            command.setDestination(dest);
785            command.setSubscriptionKey(subscriptionKey);
786            command.setMessageId(messageId.toProducerKey());
787            command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null);
788            if (ack != null && ack.isUnmatchedAck()) {
789                command.setAck(UNMATCHED);
790            } else {
791                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
792                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
793            }
794            store(command, false, null, null);
795        }
796
797        @Override
798        public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
799            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
800                    .getSubscriptionName());
801            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
802            command.setDestination(dest);
803            command.setSubscriptionKey(subscriptionKey.toString());
804            command.setRetroactive(retroactive);
805            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
806            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
807            store(command, isEnableJournalDiskSyncs() && true, null, null);
808            this.subscriptionCount.incrementAndGet();
809        }
810
811        @Override
812        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
813            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
814            command.setDestination(dest);
815            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
816            store(command, isEnableJournalDiskSyncs() && true, null, null);
817            this.subscriptionCount.decrementAndGet();
818        }
819
820        @Override
821        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
822
823            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
824            indexLock.writeLock().lock();
825            try {
826                pageFile.tx().execute(new Transaction.Closure<IOException>() {
827                    @Override
828                    public void execute(Transaction tx) throws IOException {
829                        StoredDestination sd = getStoredDestination(dest, tx);
830                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
831                                .hasNext();) {
832                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
833                            SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
834                                    .getValue().getSubscriptionInfo().newInput()));
835                            subscriptions.add(info);
836
837                        }
838                    }
839                });
840            } finally {
841                indexLock.writeLock().unlock();
842            }
843
844            SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
845            subscriptions.toArray(rc);
846            return rc;
847        }
848
849        @Override
850        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
851            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
852            indexLock.writeLock().lock();
853            try {
854                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
855                    @Override
856                    public SubscriptionInfo execute(Transaction tx) throws IOException {
857                        StoredDestination sd = getStoredDestination(dest, tx);
858                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
859                        if (command == null) {
860                            return null;
861                        }
862                        return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
863                                .getSubscriptionInfo().newInput()));
864                    }
865                });
866            } finally {
867                indexLock.writeLock().unlock();
868            }
869        }
870
871        @Override
872        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
873            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
874            indexLock.writeLock().lock();
875            try {
876                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
877                    @Override
878                    public Integer execute(Transaction tx) throws IOException {
879                        StoredDestination sd = getStoredDestination(dest, tx);
880                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
881                        if (cursorPos == null) {
882                            // The subscription might not exist.
883                            return 0;
884                        }
885
886                        return (int) getStoredMessageCount(tx, sd, subscriptionKey);
887                    }
888                });
889            } finally {
890                indexLock.writeLock().unlock();
891            }
892        }
893
894        @Override
895        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
896                throws Exception {
897            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
898            @SuppressWarnings("unused")
899            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
900            indexLock.writeLock().lock();
901            try {
902                pageFile.tx().execute(new Transaction.Closure<Exception>() {
903                    @Override
904                    public void execute(Transaction tx) throws Exception {
905                        StoredDestination sd = getStoredDestination(dest, tx);
906                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
907                        sd.orderIndex.setBatch(tx, cursorPos);
908                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
909                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
910                                .hasNext();) {
911                            Entry<Long, MessageKeys> entry = iterator.next();
912                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
913                                continue;
914                            }
915                            listener.recoverMessage(loadMessage(entry.getValue().location));
916                        }
917                        sd.orderIndex.resetCursorPosition();
918                    }
919                });
920            } finally {
921                indexLock.writeLock().unlock();
922            }
923        }
924
925        @Override
926        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
927                final MessageRecoveryListener listener) throws Exception {
928            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
929            @SuppressWarnings("unused")
930            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
931            indexLock.writeLock().lock();
932            try {
933                pageFile.tx().execute(new Transaction.Closure<Exception>() {
934                    @Override
935                    public void execute(Transaction tx) throws Exception {
936                        StoredDestination sd = getStoredDestination(dest, tx);
937                        sd.orderIndex.resetCursorPosition();
938                        MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
939                        if (moc == null) {
940                            LastAck pos = getLastAck(tx, sd, subscriptionKey);
941                            if (pos == null) {
942                                // sub deleted
943                                return;
944                            }
945                            sd.orderIndex.setBatch(tx, pos);
946                            moc = sd.orderIndex.cursor;
947                        } else {
948                            sd.orderIndex.cursor.sync(moc);
949                        }
950
951                        Entry<Long, MessageKeys> entry = null;
952                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
953                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
954                                .hasNext();) {
955                            entry = iterator.next();
956                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
957                                continue;
958                            }
959                            if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
960                                counter++;
961                            }
962                            if (counter >= maxReturned || listener.hasSpace() == false) {
963                                break;
964                            }
965                        }
966                        sd.orderIndex.stoppedIterating();
967                        if (entry != null) {
968                            MessageOrderCursor copy = sd.orderIndex.cursor.copy();
969                            sd.subscriptionCursors.put(subscriptionKey, copy);
970                        }
971                    }
972                });
973            } finally {
974                indexLock.writeLock().unlock();
975            }
976        }
977
978        @Override
979        public void resetBatching(String clientId, String subscriptionName) {
980            try {
981                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
982                indexLock.writeLock().lock();
983                try {
984                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
985                        @Override
986                        public void execute(Transaction tx) throws IOException {
987                            StoredDestination sd = getStoredDestination(dest, tx);
988                            sd.subscriptionCursors.remove(subscriptionKey);
989                        }
990                    });
991                }finally {
992                    indexLock.writeLock().unlock();
993                }
994            } catch (IOException e) {
995                throw new RuntimeException(e);
996            }
997        }
998    }
999
1000    String subscriptionKey(String clientId, String subscriptionName) {
1001        return clientId + ":" + subscriptionName;
1002    }
1003
1004    @Override
1005    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
1006        return this.transactionStore.proxy(new KahaDBMessageStore(destination));
1007    }
1008
1009    @Override
1010    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
1011        return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
1012    }
1013
1014    /**
1015     * Cleanup method to remove any state associated with the given destination.
1016     * This method does not stop the message store (it might not be cached).
1017     *
1018     * @param destination
1019     *            Destination to forget
1020     */
1021    @Override
1022    public void removeQueueMessageStore(ActiveMQQueue destination) {
1023    }
1024
1025    /**
1026     * Cleanup method to remove any state associated with the given destination
1027     * This method does not stop the message store (it might not be cached).
1028     *
1029     * @param destination
1030     *            Destination to forget
1031     */
1032    @Override
1033    public void removeTopicMessageStore(ActiveMQTopic destination) {
1034    }
1035
1036    @Override
1037    public void deleteAllMessages() throws IOException {
1038        deleteAllMessages = true;
1039    }
1040
1041    @Override
1042    public Set<ActiveMQDestination> getDestinations() {
1043        try {
1044            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
1045            indexLock.writeLock().lock();
1046            try {
1047                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1048                    @Override
1049                    public void execute(Transaction tx) throws IOException {
1050                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
1051                                .hasNext();) {
1052                            Entry<String, StoredDestination> entry = iterator.next();
1053                            //Removing isEmpty topic check - see AMQ-5875
1054                            rc.add(convert(entry.getKey()));
1055                        }
1056                    }
1057                });
1058            }finally {
1059                indexLock.writeLock().unlock();
1060            }
1061            return rc;
1062        } catch (IOException e) {
1063            throw new RuntimeException(e);
1064        }
1065    }
1066
1067    @Override
1068    public long getLastMessageBrokerSequenceId() throws IOException {
1069        return 0;
1070    }
1071
1072    @Override
1073    public long getLastProducerSequenceId(ProducerId id) {
1074        indexLock.writeLock().lock();
1075        try {
1076            return metadata.producerSequenceIdTracker.getLastSeqId(id);
1077        } finally {
1078            indexLock.writeLock().unlock();
1079        }
1080    }
1081
1082    @Override
1083    public long size() {
1084        try {
1085            return journalSize.get() + getPageFile().getDiskSize();
1086        } catch (IOException e) {
1087            throw new RuntimeException(e);
1088        }
1089    }
1090
1091    @Override
1092    public void beginTransaction(ConnectionContext context) throws IOException {
1093        throw new IOException("Not yet implemented.");
1094    }
1095    @Override
1096    public void commitTransaction(ConnectionContext context) throws IOException {
1097        throw new IOException("Not yet implemented.");
1098    }
1099    @Override
1100    public void rollbackTransaction(ConnectionContext context) throws IOException {
1101        throw new IOException("Not yet implemented.");
1102    }
1103
1104    @Override
1105    public void checkpoint(boolean sync) throws IOException {
1106        super.checkpointCleanup(sync);
1107    }
1108
1109    // /////////////////////////////////////////////////////////////////
1110    // Internal helper methods.
1111    // /////////////////////////////////////////////////////////////////
1112
1113    /**
1114     * @param location
1115     * @return
1116     * @throws IOException
1117     */
1118    Message loadMessage(Location location) throws IOException {
1119        try {
1120            JournalCommand<?> command = load(location);
1121            KahaAddMessageCommand addMessage = null;
1122            switch (command.type()) {
1123                case KAHA_UPDATE_MESSAGE_COMMAND:
1124                    addMessage = ((KahaUpdateMessageCommand) command).getMessage();
1125                    break;
1126                default:
1127                    addMessage = (KahaAddMessageCommand) command;
1128            }
1129            Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1130            return msg;
1131        } catch (IOException ioe) {
1132            LOG.error("Failed to load message at: {}", location , ioe);
1133            brokerService.handleIOException(ioe);
1134            throw ioe;
1135        }
1136    }
1137
1138    // /////////////////////////////////////////////////////////////////
1139    // Internal conversion methods.
1140    // /////////////////////////////////////////////////////////////////
1141
1142    KahaLocation convert(Location location) {
1143        KahaLocation rc = new KahaLocation();
1144        rc.setLogId(location.getDataFileId());
1145        rc.setOffset(location.getOffset());
1146        return rc;
1147    }
1148
1149    KahaDestination convert(ActiveMQDestination dest) {
1150        KahaDestination rc = new KahaDestination();
1151        rc.setName(dest.getPhysicalName());
1152        switch (dest.getDestinationType()) {
1153        case ActiveMQDestination.QUEUE_TYPE:
1154            rc.setType(DestinationType.QUEUE);
1155            return rc;
1156        case ActiveMQDestination.TOPIC_TYPE:
1157            rc.setType(DestinationType.TOPIC);
1158            return rc;
1159        case ActiveMQDestination.TEMP_QUEUE_TYPE:
1160            rc.setType(DestinationType.TEMP_QUEUE);
1161            return rc;
1162        case ActiveMQDestination.TEMP_TOPIC_TYPE:
1163            rc.setType(DestinationType.TEMP_TOPIC);
1164            return rc;
1165        default:
1166            return null;
1167        }
1168    }
1169
1170    ActiveMQDestination convert(String dest) {
1171        int p = dest.indexOf(":");
1172        if (p < 0) {
1173            throw new IllegalArgumentException("Not in the valid destination format");
1174        }
1175        int type = Integer.parseInt(dest.substring(0, p));
1176        String name = dest.substring(p + 1);
1177        return convert(type, name);
1178    }
1179
1180    private ActiveMQDestination convert(KahaDestination commandDestination) {
1181        return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1182    }
1183
1184    private ActiveMQDestination convert(int type, String name) {
1185        switch (KahaDestination.DestinationType.valueOf(type)) {
1186        case QUEUE:
1187            return new ActiveMQQueue(name);
1188        case TOPIC:
1189            return new ActiveMQTopic(name);
1190        case TEMP_QUEUE:
1191            return new ActiveMQTempQueue(name);
1192        case TEMP_TOPIC:
1193            return new ActiveMQTempTopic(name);
1194        default:
1195            throw new IllegalArgumentException("Not in the valid destination format");
1196        }
1197    }
1198
1199    public TransactionIdTransformer getTransactionIdTransformer() {
1200        return transactionIdTransformer;
1201    }
1202
1203    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1204        this.transactionIdTransformer = transactionIdTransformer;
1205    }
1206
1207    static class AsyncJobKey {
1208        MessageId id;
1209        ActiveMQDestination destination;
1210
1211        AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1212            this.id = id;
1213            this.destination = destination;
1214        }
1215
1216        @Override
1217        public boolean equals(Object obj) {
1218            if (obj == this) {
1219                return true;
1220            }
1221            return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1222                    && destination.equals(((AsyncJobKey) obj).destination);
1223        }
1224
1225        @Override
1226        public int hashCode() {
1227            return id.hashCode() + destination.hashCode();
1228        }
1229
1230        @Override
1231        public String toString() {
1232            return destination.getPhysicalName() + "-" + id;
1233        }
1234    }
1235
1236    public interface StoreTask {
1237        public boolean cancel();
1238
1239        public void aquireLocks();
1240
1241        public void releaseLocks();
1242    }
1243
1244    class StoreQueueTask implements Runnable, StoreTask {
1245        protected final Message message;
1246        protected final ConnectionContext context;
1247        protected final KahaDBMessageStore store;
1248        protected final InnerFutureTask future;
1249        protected final AtomicBoolean done = new AtomicBoolean();
1250        protected final AtomicBoolean locked = new AtomicBoolean();
1251
1252        public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1253            this.store = store;
1254            this.context = context;
1255            this.message = message;
1256            this.future = new InnerFutureTask(this);
1257        }
1258
1259        public ListenableFuture<Object> getFuture() {
1260            return this.future;
1261        }
1262
1263        @Override
1264        public boolean cancel() {
1265            if (this.done.compareAndSet(false, true)) {
1266                return this.future.cancel(false);
1267            }
1268            return false;
1269        }
1270
1271        @Override
1272        public void aquireLocks() {
1273            if (this.locked.compareAndSet(false, true)) {
1274                try {
1275                    globalQueueSemaphore.acquire();
1276                    store.acquireLocalAsyncLock();
1277                    message.incrementReferenceCount();
1278                } catch (InterruptedException e) {
1279                    LOG.warn("Failed to aquire lock", e);
1280                }
1281            }
1282
1283        }
1284
1285        @Override
1286        public void releaseLocks() {
1287            if (this.locked.compareAndSet(true, false)) {
1288                store.releaseLocalAsyncLock();
1289                globalQueueSemaphore.release();
1290                message.decrementReferenceCount();
1291            }
1292        }
1293
1294        @Override
1295        public void run() {
1296            this.store.doneTasks++;
1297            try {
1298                if (this.done.compareAndSet(false, true)) {
1299                    this.store.addMessage(context, message);
1300                    removeQueueTask(this.store, this.message.getMessageId());
1301                    this.future.complete();
1302                } else if (cancelledTaskModMetric > 0 && (++this.store.canceledTasks) % cancelledTaskModMetric == 0) {
1303                    System.err.println(this.store.dest.getName() + " cancelled: "
1304                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1305                    this.store.canceledTasks = this.store.doneTasks = 0;
1306                }
1307            } catch (Exception e) {
1308                this.future.setException(e);
1309            }
1310        }
1311
1312        protected Message getMessage() {
1313            return this.message;
1314        }
1315
1316        private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object>  {
1317
1318            private final AtomicReference<Runnable> listenerRef = new AtomicReference<>();
1319
1320            public InnerFutureTask(Runnable runnable) {
1321                super(runnable, null);
1322            }
1323
1324            public void setException(final Exception e) {
1325                super.setException(e);
1326            }
1327
1328            public void complete() {
1329                super.set(null);
1330            }
1331
1332            @Override
1333            public void done() {
1334                fireListener();
1335            }
1336
1337            @Override
1338            public void addListener(Runnable listener) {
1339                this.listenerRef.set(listener);
1340                if (isDone()) {
1341                    fireListener();
1342                }
1343            }
1344
1345            private void fireListener() {
1346                Runnable listener = listenerRef.getAndSet(null);
1347                if (listener != null) {
1348                    try {
1349                        listener.run();
1350                    } catch (Exception ignored) {
1351                        LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored);
1352                    }
1353                }
1354            }
1355        }
1356    }
1357
1358    class StoreTopicTask extends StoreQueueTask {
1359        private final int subscriptionCount;
1360        private final List<String> subscriptionKeys = new ArrayList<String>(1);
1361        private final KahaDBTopicMessageStore topicStore;
1362        public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1363                int subscriptionCount) {
1364            super(store, context, message);
1365            this.topicStore = store;
1366            this.subscriptionCount = subscriptionCount;
1367
1368        }
1369
1370        @Override
1371        public void aquireLocks() {
1372            if (this.locked.compareAndSet(false, true)) {
1373                try {
1374                    globalTopicSemaphore.acquire();
1375                    store.acquireLocalAsyncLock();
1376                    message.incrementReferenceCount();
1377                } catch (InterruptedException e) {
1378                    LOG.warn("Failed to aquire lock", e);
1379                }
1380            }
1381        }
1382
1383        @Override
1384        public void releaseLocks() {
1385            if (this.locked.compareAndSet(true, false)) {
1386                message.decrementReferenceCount();
1387                store.releaseLocalAsyncLock();
1388                globalTopicSemaphore.release();
1389            }
1390        }
1391
1392        /**
1393         * add a key
1394         *
1395         * @param key
1396         * @return true if all acknowledgements received
1397         */
1398        public boolean addSubscriptionKey(String key) {
1399            synchronized (this.subscriptionKeys) {
1400                this.subscriptionKeys.add(key);
1401            }
1402            return this.subscriptionKeys.size() >= this.subscriptionCount;
1403        }
1404
1405        @Override
1406        public void run() {
1407            this.store.doneTasks++;
1408            try {
1409                if (this.done.compareAndSet(false, true)) {
1410                    this.topicStore.addMessage(context, message);
1411                    // apply any acks we have
1412                    synchronized (this.subscriptionKeys) {
1413                        for (String key : this.subscriptionKeys) {
1414                            this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1415
1416                        }
1417                    }
1418                    removeTopicTask(this.topicStore, this.message.getMessageId());
1419                    this.future.complete();
1420                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1421                    System.err.println(this.store.dest.getName() + " cancelled: "
1422                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1423                    this.store.canceledTasks = this.store.doneTasks = 0;
1424                }
1425            } catch (Exception e) {
1426                this.future.setException(e);
1427            }
1428        }
1429    }
1430
1431    public class StoreTaskExecutor extends ThreadPoolExecutor {
1432
1433        public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1434            super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1435        }
1436
1437        @Override
1438        protected void afterExecute(Runnable runnable, Throwable throwable) {
1439            super.afterExecute(runnable, throwable);
1440
1441            if (runnable instanceof StoreTask) {
1442               ((StoreTask)runnable).releaseLocks();
1443            }
1444        }
1445    }
1446
1447    @Override
1448    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
1449        return new JobSchedulerStoreImpl();
1450    }
1451}