001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Set;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.FutureTask;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.Semaphore;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.TimeoutException;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicInteger;
042import java.util.concurrent.atomic.AtomicReference;
043
044import org.apache.activemq.broker.ConnectionContext;
045import org.apache.activemq.broker.region.BaseDestination;
046import org.apache.activemq.broker.scheduler.JobSchedulerStore;
047import org.apache.activemq.command.ActiveMQDestination;
048import org.apache.activemq.command.ActiveMQQueue;
049import org.apache.activemq.command.ActiveMQTempQueue;
050import org.apache.activemq.command.ActiveMQTempTopic;
051import org.apache.activemq.command.ActiveMQTopic;
052import org.apache.activemq.command.Message;
053import org.apache.activemq.command.MessageAck;
054import org.apache.activemq.command.MessageId;
055import org.apache.activemq.command.ProducerId;
056import org.apache.activemq.command.SubscriptionInfo;
057import org.apache.activemq.command.TransactionId;
058import org.apache.activemq.openwire.OpenWireFormat;
059import org.apache.activemq.protobuf.Buffer;
060import org.apache.activemq.store.AbstractMessageStore;
061import org.apache.activemq.store.IndexListener;
062import org.apache.activemq.store.ListenableFuture;
063import org.apache.activemq.store.MessageRecoveryListener;
064import org.apache.activemq.store.MessageStore;
065import org.apache.activemq.store.PersistenceAdapter;
066import org.apache.activemq.store.ProxyMessageStore;
067import org.apache.activemq.store.TopicMessageStore;
068import org.apache.activemq.store.TransactionIdTransformer;
069import org.apache.activemq.store.TransactionStore;
070import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
071import org.apache.activemq.store.kahadb.data.KahaDestination;
072import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
073import org.apache.activemq.store.kahadb.data.KahaLocation;
074import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
075import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
076import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
077import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
078import org.apache.activemq.store.kahadb.disk.journal.Location;
079import org.apache.activemq.store.kahadb.disk.page.Transaction;
080import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
081import org.apache.activemq.usage.MemoryUsage;
082import org.apache.activemq.usage.SystemUsage;
083import org.apache.activemq.util.IOExceptionSupport;
084import org.apache.activemq.util.ServiceStopper;
085import org.apache.activemq.util.ThreadPoolUtils;
086import org.apache.activemq.wireformat.WireFormat;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
091    static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
092    private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH;
093
094    public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
095    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
096            PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
097    public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
098    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
099            PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
100
101    protected ExecutorService queueExecutor;
102    protected ExecutorService topicExecutor;
103    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
104    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
105    final WireFormat wireFormat = new OpenWireFormat();
106    private SystemUsage usageManager;
107    private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
108    private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
109    Semaphore globalQueueSemaphore;
110    Semaphore globalTopicSemaphore;
111    private boolean concurrentStoreAndDispatchQueues = true;
112    // when true, message order may be compromised when cache is exhausted if store is out
113    // or order w.r.t cache
114    private boolean concurrentStoreAndDispatchTopics = false;
115    private final boolean concurrentStoreAndDispatchTransactions = false;
116    private int maxAsyncJobs = MAX_ASYNC_JOBS;
117    private final KahaDBTransactionStore transactionStore;
118    private TransactionIdTransformer transactionIdTransformer;
119
120    public KahaDBStore() {
121        this.transactionStore = new KahaDBTransactionStore(this);
122        this.transactionIdTransformer = new TransactionIdTransformer() {
123            @Override
124            public TransactionId transform(TransactionId txid) {
125                return txid;
126            }
127        };
128    }
129
130    @Override
131    public String toString() {
132        return "KahaDB:[" + directory.getAbsolutePath() + "]";
133    }
134
135    @Override
136    public void setBrokerName(String brokerName) {
137    }
138
139    @Override
140    public void setUsageManager(SystemUsage usageManager) {
141        this.usageManager = usageManager;
142    }
143
144    public SystemUsage getUsageManager() {
145        return this.usageManager;
146    }
147
148    /**
149     * @return the concurrentStoreAndDispatch
150     */
151    public boolean isConcurrentStoreAndDispatchQueues() {
152        return this.concurrentStoreAndDispatchQueues;
153    }
154
155    /**
156     * @param concurrentStoreAndDispatch
157     *            the concurrentStoreAndDispatch to set
158     */
159    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
160        this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
161    }
162
163    /**
164     * @return the concurrentStoreAndDispatch
165     */
166    public boolean isConcurrentStoreAndDispatchTopics() {
167        return this.concurrentStoreAndDispatchTopics;
168    }
169
170    /**
171     * @param concurrentStoreAndDispatch
172     *            the concurrentStoreAndDispatch to set
173     */
174    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
175        this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
176    }
177
178    public boolean isConcurrentStoreAndDispatchTransactions() {
179        return this.concurrentStoreAndDispatchTransactions;
180    }
181
182    /**
183     * @return the maxAsyncJobs
184     */
185    public int getMaxAsyncJobs() {
186        return this.maxAsyncJobs;
187    }
188
189    /**
190     * @param maxAsyncJobs
191     *            the maxAsyncJobs to set
192     */
193    public void setMaxAsyncJobs(int maxAsyncJobs) {
194        this.maxAsyncJobs = maxAsyncJobs;
195    }
196
197    @Override
198    public void doStart() throws Exception {
199        if (brokerService != null) {
200            metadata.openwireVersion = brokerService.getStoreOpenWireVersion();
201            wireFormat.setVersion(metadata.openwireVersion);
202
203            if (LOG.isDebugEnabled()) {
204                LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion);
205            }
206
207        }
208        super.doStart();
209
210        if (brokerService != null) {
211            // In case the recovered store used a different OpenWire version log a warning
212            // to assist in determining why journal reads fail.
213            if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) {
214                LOG.warn("Recovered Store uses a different OpenWire version[{}] " +
215                         "than the version configured[{}].",
216                         metadata.openwireVersion, brokerService.getStoreOpenWireVersion());
217            }
218        }
219
220        this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
221        this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
222        this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
223        this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
224        this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
225            asyncQueueJobQueue, new ThreadFactory() {
226                @Override
227                public Thread newThread(Runnable runnable) {
228                    Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
229                    thread.setDaemon(true);
230                    return thread;
231                }
232            });
233        this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
234            asyncTopicJobQueue, new ThreadFactory() {
235                @Override
236                public Thread newThread(Runnable runnable) {
237                    Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
238                    thread.setDaemon(true);
239                    return thread;
240                }
241            });
242    }
243
244    @Override
245    public void doStop(ServiceStopper stopper) throws Exception {
246        // drain down async jobs
247        LOG.info("Stopping async queue tasks");
248        if (this.globalQueueSemaphore != null) {
249            this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
250        }
251        synchronized (this.asyncQueueMaps) {
252            for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
253                synchronized (m) {
254                    for (StoreTask task : m.values()) {
255                        task.cancel();
256                    }
257                }
258            }
259            this.asyncQueueMaps.clear();
260        }
261        LOG.info("Stopping async topic tasks");
262        if (this.globalTopicSemaphore != null) {
263            this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
264        }
265        synchronized (this.asyncTopicMaps) {
266            for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
267                synchronized (m) {
268                    for (StoreTask task : m.values()) {
269                        task.cancel();
270                    }
271                }
272            }
273            this.asyncTopicMaps.clear();
274        }
275        if (this.globalQueueSemaphore != null) {
276            this.globalQueueSemaphore.drainPermits();
277        }
278        if (this.globalTopicSemaphore != null) {
279            this.globalTopicSemaphore.drainPermits();
280        }
281        if (this.queueExecutor != null) {
282            ThreadPoolUtils.shutdownNow(queueExecutor);
283            queueExecutor = null;
284        }
285        if (this.topicExecutor != null) {
286            ThreadPoolUtils.shutdownNow(topicExecutor);
287            topicExecutor = null;
288        }
289        LOG.info("Stopped KahaDB");
290        super.doStop(stopper);
291    }
292
293    private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
294        return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
295            @Override
296            public Location execute(Transaction tx) throws IOException {
297                StoredDestination sd = getStoredDestination(destination, tx);
298                Long sequence = sd.messageIdIndex.get(tx, key);
299                if (sequence == null) {
300                    return null;
301                }
302                return sd.orderIndex.get(tx, sequence).location;
303            }
304        });
305    }
306
307    protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
308        StoreQueueTask task = null;
309        synchronized (store.asyncTaskMap) {
310            task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
311        }
312        return task;
313    }
314
315    // with asyncTaskMap locked
316    protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
317        store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
318        this.queueExecutor.execute(task);
319    }
320
321    protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
322        StoreTopicTask task = null;
323        synchronized (store.asyncTaskMap) {
324            task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
325        }
326        return task;
327    }
328
329    protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
330        synchronized (store.asyncTaskMap) {
331            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
332        }
333        this.topicExecutor.execute(task);
334    }
335
336    @Override
337    public TransactionStore createTransactionStore() throws IOException {
338        return this.transactionStore;
339    }
340
341    public boolean getForceRecoverIndex() {
342        return this.forceRecoverIndex;
343    }
344
345    public void setForceRecoverIndex(boolean forceRecoverIndex) {
346        this.forceRecoverIndex = forceRecoverIndex;
347    }
348
349    public void forgetRecoveredAcks(ArrayList<MessageAck> preparedAcks, boolean isRollback) throws IOException {
350        if (preparedAcks != null) {
351            Map<ActiveMQDestination, KahaDBMessageStore> stores = new HashMap<>();
352            for (MessageAck ack : preparedAcks) {
353                stores.put(ack.getDestination(), findMatchingStore(ack.getDestination()));
354            }
355            ArrayList<MessageAck> perStoreAcks = new ArrayList<>();
356            for (Entry<ActiveMQDestination, KahaDBMessageStore> entry : stores.entrySet()) {
357                for (MessageAck ack : preparedAcks) {
358                    if (entry.getKey().equals(ack.getDestination())) {
359                        perStoreAcks.add(ack);
360                    }
361                }
362                entry.getValue().forgetRecoveredAcks(perStoreAcks, isRollback);
363                perStoreAcks.clear();
364            }
365        }
366    }
367
368    public void trackRecoveredAcks(ArrayList<MessageAck> preparedAcks) throws IOException {
369        Map<ActiveMQDestination, KahaDBMessageStore> stores = new HashMap<>();
370        for (MessageAck ack : preparedAcks) {
371            stores.put(ack.getDestination(), findMatchingStore(ack.getDestination()));
372        }
373        ArrayList<MessageAck> perStoreAcks = new ArrayList<>();
374        for (Entry<ActiveMQDestination, KahaDBMessageStore> entry : stores.entrySet()) {
375            for (MessageAck ack : preparedAcks) {
376                if (entry.getKey().equals(ack.getDestination())) {
377                    perStoreAcks.add(ack);
378                }
379            }
380            entry.getValue().trackRecoveredAcks(perStoreAcks);
381            perStoreAcks.clear();
382        }
383    }
384
385    private KahaDBMessageStore findMatchingStore(ActiveMQDestination activeMQDestination) throws IOException {
386        ProxyMessageStore store = (ProxyMessageStore) storeCache.get(convert(activeMQDestination));
387        if (store == null) {
388            if (activeMQDestination.isQueue()) {
389                store = (ProxyMessageStore) createQueueMessageStore((ActiveMQQueue) activeMQDestination);
390            } else {
391                store = (ProxyMessageStore) createTopicMessageStore((ActiveMQTopic) activeMQDestination);
392            }
393        }
394        return (KahaDBMessageStore) store.getDelegate();
395    }
396
397    public class KahaDBMessageStore extends AbstractMessageStore {
398        protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
399        protected KahaDestination dest;
400        private final int maxAsyncJobs;
401        private final Semaphore localDestinationSemaphore;
402        protected final Set<String> ackedAndPrepared = new HashSet<>();
403        protected final Set<String> rolledBackAcks = new HashSet<>();
404
405        double doneTasks, canceledTasks = 0;
406
407        public KahaDBMessageStore(ActiveMQDestination destination) {
408            super(destination);
409            this.dest = convert(destination);
410            this.maxAsyncJobs = getMaxAsyncJobs();
411            this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
412        }
413
414        @Override
415        public ActiveMQDestination getDestination() {
416            return destination;
417        }
418
419
420        // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
421        // till then they are skipped by the store.
422        // 'at most once' XA guarantee
423        public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
424            indexLock.writeLock().lock();
425            try {
426                for (MessageAck ack : acks) {
427                    ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
428                }
429            } finally {
430                indexLock.writeLock().unlock();
431            }
432        }
433
434        public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
435            if (acks != null) {
436                indexLock.writeLock().lock();
437                try {
438                    for (MessageAck ack : acks) {
439                        final String id = ack.getLastMessageId().toProducerKey();
440                        ackedAndPrepared.remove(id);
441                        if (rollback) {
442                            rolledBackAcks.add(id);
443                            //incrementAndAddSizeToStoreStat(dest, 0);
444                        }
445                    }
446                } finally {
447                    indexLock.writeLock().unlock();
448                }
449            }
450        }
451
452        @Override
453        public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
454                throws IOException {
455            if (isConcurrentStoreAndDispatchQueues()) {
456                message.beforeMarshall(wireFormat);
457                StoreQueueTask result = new StoreQueueTask(this, context, message);
458                ListenableFuture<Object> future = result.getFuture();
459                message.getMessageId().setFutureOrSequenceLong(future);
460                message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch
461                result.aquireLocks();
462                synchronized (asyncTaskMap) {
463                    addQueueTask(this, result);
464                    if (indexListener != null) {
465                        indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
466                    }
467                }
468                return future;
469            } else {
470                return super.asyncAddQueueMessage(context, message);
471            }
472        }
473
474        @Override
475        public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
476            if (isConcurrentStoreAndDispatchQueues()) {
477                AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
478                StoreQueueTask task = null;
479                synchronized (asyncTaskMap) {
480                    task = (StoreQueueTask) asyncTaskMap.get(key);
481                }
482                if (task != null) {
483                    if (ack.isInTransaction() || !task.cancel()) {
484                        try {
485                            task.future.get();
486                        } catch (InterruptedException e) {
487                            throw new InterruptedIOException(e.toString());
488                        } catch (Exception ignored) {
489                            LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
490                        }
491                        removeMessage(context, ack);
492                    } else {
493                        indexLock.writeLock().lock();
494                        try {
495                            metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId());
496                        } finally {
497                            indexLock.writeLock().unlock();
498                        }
499                        synchronized (asyncTaskMap) {
500                            asyncTaskMap.remove(key);
501                        }
502                    }
503                } else {
504                    removeMessage(context, ack);
505                }
506            } else {
507                removeMessage(context, ack);
508            }
509        }
510
511        @Override
512        public void addMessage(final ConnectionContext context, final Message message) throws IOException {
513            final KahaAddMessageCommand command = new KahaAddMessageCommand();
514            command.setDestination(dest);
515            command.setMessageId(message.getMessageId().toProducerKey());
516            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
517            command.setPriority(message.getPriority());
518            command.setPrioritySupported(isPrioritizedMessages());
519            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
520            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
521            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() {
522                // sync add? (for async, future present from getFutureOrSequenceLong)
523                Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
524
525                @Override
526                public void sequenceAssignedWithIndexLocked(final long sequence) {
527                    message.getMessageId().setFutureOrSequenceLong(sequence);
528                    if (indexListener != null) {
529                        if (possibleFuture == null) {
530                            trackPendingAdd(dest, sequence);
531                            indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
532                                @Override
533                                public void run() {
534                                    trackPendingAddComplete(dest, sequence);
535                                }
536                            }));
537                        }
538                    }
539                }
540            }, null);
541        }
542
543        @Override
544        public void updateMessage(Message message) throws IOException {
545            if (LOG.isTraceEnabled()) {
546                LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
547            }
548            KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
549            KahaAddMessageCommand command = new KahaAddMessageCommand();
550            command.setDestination(dest);
551            command.setMessageId(message.getMessageId().toProducerKey());
552            command.setPriority(message.getPriority());
553            command.setPrioritySupported(prioritizedMessages);
554            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
555            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
556            updateMessageCommand.setMessage(command);
557            store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null);
558        }
559
560        @Override
561        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
562            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
563            command.setDestination(dest);
564            command.setMessageId(ack.getLastMessageId().toProducerKey());
565            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())));
566
567            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
568            command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
569            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
570        }
571
572        @Override
573        public void removeAllMessages(ConnectionContext context) throws IOException {
574            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
575            command.setDestination(dest);
576            store(command, true, null, null);
577        }
578
579        @Override
580        public Message getMessage(MessageId identity) throws IOException {
581            final String key = identity.toProducerKey();
582
583            // Hopefully one day the page file supports concurrent read
584            // operations... but for now we must
585            // externally synchronize...
586            Location location;
587            indexLock.writeLock().lock();
588            try {
589                location = findMessageLocation(key, dest);
590            } finally {
591                indexLock.writeLock().unlock();
592            }
593            if (location == null) {
594                return null;
595            }
596
597            return loadMessage(location);
598        }
599
600        @Override
601        public int getMessageCount() throws IOException {
602            try {
603                lockAsyncJobQueue();
604                indexLock.writeLock().lock();
605                try {
606                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
607                        @Override
608                        public Integer execute(Transaction tx) throws IOException {
609                            // Iterate through all index entries to get a count
610                            // of messages in the destination.
611                            StoredDestination sd = getStoredDestination(dest, tx);
612                            int rc = 0;
613                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
614                                iterator.next();
615                                rc++;
616                            }
617                            rc = rc - ackedAndPrepared.size();
618                            return rc;
619                        }
620                    });
621                } finally {
622                    indexLock.writeLock().unlock();
623                }
624            } finally {
625                unlockAsyncJobQueue();
626            }
627        }
628
629        @Override
630        public boolean isEmpty() throws IOException {
631            indexLock.writeLock().lock();
632            try {
633                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
634                    @Override
635                    public Boolean execute(Transaction tx) throws IOException {
636                        // Iterate through all index entries to get a count of
637                        // messages in the destination.
638                        StoredDestination sd = getStoredDestination(dest, tx);
639                        return sd.locationIndex.isEmpty(tx);
640                    }
641                });
642            } finally {
643                indexLock.writeLock().unlock();
644            }
645        }
646
647        @Override
648        public void recover(final MessageRecoveryListener listener) throws Exception {
649            // recovery may involve expiry which will modify
650            indexLock.writeLock().lock();
651            try {
652                pageFile.tx().execute(new Transaction.Closure<Exception>() {
653                    @Override
654                    public void execute(Transaction tx) throws Exception {
655                        StoredDestination sd = getStoredDestination(dest, tx);
656                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
657                        sd.orderIndex.resetCursorPosition();
658                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
659                                .hasNext(); ) {
660                            Entry<Long, MessageKeys> entry = iterator.next();
661                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
662                                continue;
663                            }
664                            Message msg = loadMessage(entry.getValue().location);
665                            listener.recoverMessage(msg);
666                        }
667                    }
668                });
669            } finally {
670                indexLock.writeLock().unlock();
671            }
672        }
673
674        @Override
675        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
676            indexLock.writeLock().lock();
677            try {
678                pageFile.tx().execute(new Transaction.Closure<Exception>() {
679                    @Override
680                    public void execute(Transaction tx) throws Exception {
681                        StoredDestination sd = getStoredDestination(dest, tx);
682                        Entry<Long, MessageKeys> entry = null;
683                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
684                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
685                            entry = iterator.next();
686                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
687                                continue;
688                            }
689                            Message msg = loadMessage(entry.getValue().location);
690                            msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
691                            listener.recoverMessage(msg);
692                            counter++;
693                            if (counter >= maxReturned) {
694                                break;
695                            }
696                        }
697                        sd.orderIndex.stoppedIterating();
698                    }
699                });
700            } finally {
701                indexLock.writeLock().unlock();
702            }
703        }
704
705        protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
706            int counter = 0;
707            String id;
708            for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) {
709                id = iterator.next();
710                iterator.remove();
711                Long sequence = sd.messageIdIndex.get(tx, id);
712                if (sequence != null) {
713                    if (sd.orderIndex.alreadyDispatched(sequence)) {
714                        listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location));
715                        counter++;
716                        if (counter >= maxReturned) {
717                            break;
718                        }
719                    } else {
720                        LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
721                    }
722                } else {
723                    LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd);
724                }
725            }
726            return counter;
727        }
728
729
730        @Override
731        public void resetBatching() {
732            if (pageFile.isLoaded()) {
733                indexLock.writeLock().lock();
734                try {
735                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
736                        @Override
737                        public void execute(Transaction tx) throws Exception {
738                            StoredDestination sd = getExistingStoredDestination(dest, tx);
739                            if (sd != null) {
740                                sd.orderIndex.resetCursorPosition();}
741                            }
742                        });
743                } catch (Exception e) {
744                    LOG.error("Failed to reset batching",e);
745                } finally {
746                    indexLock.writeLock().unlock();
747                }
748            }
749        }
750
751        @Override
752        public void setBatch(final MessageId identity) throws IOException {
753            indexLock.writeLock().lock();
754            try {
755                pageFile.tx().execute(new Transaction.Closure<IOException>() {
756                    @Override
757                    public void execute(Transaction tx) throws IOException {
758                        StoredDestination sd = getStoredDestination(dest, tx);
759                        Long location = (Long) identity.getFutureOrSequenceLong();
760                        Long pending = sd.orderIndex.minPendingAdd();
761                        if (pending != null) {
762                            location = Math.min(location, pending-1);
763                        }
764                        sd.orderIndex.setBatch(tx, location);
765                    }
766                });
767            } finally {
768                indexLock.writeLock().unlock();
769            }
770        }
771
772        @Override
773        public void setMemoryUsage(MemoryUsage memoryUsage) {
774        }
775        @Override
776        public void start() throws Exception {
777            super.start();
778        }
779        @Override
780        public void stop() throws Exception {
781            super.stop();
782        }
783
784        protected void lockAsyncJobQueue() {
785            try {
786                if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) {
787                    throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore);
788                }
789            } catch (Exception e) {
790                LOG.error("Failed to lock async jobs for " + this.destination, e);
791            }
792        }
793
794        protected void unlockAsyncJobQueue() {
795            this.localDestinationSemaphore.release(this.maxAsyncJobs);
796        }
797
798        protected void acquireLocalAsyncLock() {
799            try {
800                this.localDestinationSemaphore.acquire();
801            } catch (InterruptedException e) {
802                LOG.error("Failed to aquire async lock for " + this.destination, e);
803            }
804        }
805
806        protected void releaseLocalAsyncLock() {
807            this.localDestinationSemaphore.release();
808        }
809
810        @Override
811        public String toString(){
812            return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
813        }
814    }
815
816    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
817        private final AtomicInteger subscriptionCount = new AtomicInteger();
818        public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
819            super(destination);
820            this.subscriptionCount.set(getAllSubscriptions().length);
821            if (isConcurrentStoreAndDispatchTopics()) {
822                asyncTopicMaps.add(asyncTaskMap);
823            }
824        }
825
826        @Override
827        public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
828                throws IOException {
829            if (isConcurrentStoreAndDispatchTopics()) {
830                message.beforeMarshall(wireFormat);
831                StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
832                result.aquireLocks();
833                addTopicTask(this, result);
834                return result.getFuture();
835            } else {
836                return super.asyncAddTopicMessage(context, message);
837            }
838        }
839
840        @Override
841        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
842                                MessageId messageId, MessageAck ack) throws IOException {
843            String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
844            if (isConcurrentStoreAndDispatchTopics()) {
845                AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
846                StoreTopicTask task = null;
847                synchronized (asyncTaskMap) {
848                    task = (StoreTopicTask) asyncTaskMap.get(key);
849                }
850                if (task != null) {
851                    if (task.addSubscriptionKey(subscriptionKey)) {
852                        removeTopicTask(this, messageId);
853                        if (task.cancel()) {
854                            synchronized (asyncTaskMap) {
855                                asyncTaskMap.remove(key);
856                            }
857                        }
858                    }
859                } else {
860                    doAcknowledge(context, subscriptionKey, messageId, ack);
861                }
862            } else {
863                doAcknowledge(context, subscriptionKey, messageId, ack);
864            }
865        }
866
867        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
868                throws IOException {
869            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
870            command.setDestination(dest);
871            command.setSubscriptionKey(subscriptionKey);
872            command.setMessageId(messageId.toProducerKey());
873            command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null);
874            if (ack != null && ack.isUnmatchedAck()) {
875                command.setAck(UNMATCHED);
876            } else {
877                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
878                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
879            }
880            store(command, false, null, null);
881        }
882
883        @Override
884        public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
885            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
886                    .getSubscriptionName());
887            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
888            command.setDestination(dest);
889            command.setSubscriptionKey(subscriptionKey.toString());
890            command.setRetroactive(retroactive);
891            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
892            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
893            store(command, isEnableJournalDiskSyncs() && true, null, null);
894            this.subscriptionCount.incrementAndGet();
895        }
896
897        @Override
898        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
899            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
900            command.setDestination(dest);
901            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
902            store(command, isEnableJournalDiskSyncs() && true, null, null);
903            this.subscriptionCount.decrementAndGet();
904        }
905
906        @Override
907        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
908
909            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
910            indexLock.writeLock().lock();
911            try {
912                pageFile.tx().execute(new Transaction.Closure<IOException>() {
913                    @Override
914                    public void execute(Transaction tx) throws IOException {
915                        StoredDestination sd = getStoredDestination(dest, tx);
916                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
917                                .hasNext();) {
918                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
919                            SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
920                                    .getValue().getSubscriptionInfo().newInput()));
921                            subscriptions.add(info);
922
923                        }
924                    }
925                });
926            } finally {
927                indexLock.writeLock().unlock();
928            }
929
930            SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
931            subscriptions.toArray(rc);
932            return rc;
933        }
934
935        @Override
936        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
937            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
938            indexLock.writeLock().lock();
939            try {
940                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
941                    @Override
942                    public SubscriptionInfo execute(Transaction tx) throws IOException {
943                        StoredDestination sd = getStoredDestination(dest, tx);
944                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
945                        if (command == null) {
946                            return null;
947                        }
948                        return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
949                                .getSubscriptionInfo().newInput()));
950                    }
951                });
952            } finally {
953                indexLock.writeLock().unlock();
954            }
955        }
956
957        @Override
958        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
959            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
960            indexLock.writeLock().lock();
961            try {
962                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
963                    @Override
964                    public Integer execute(Transaction tx) throws IOException {
965                        StoredDestination sd = getStoredDestination(dest, tx);
966                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
967                        if (cursorPos == null) {
968                            // The subscription might not exist.
969                            return 0;
970                        }
971
972                        return (int) getStoredMessageCount(tx, sd, subscriptionKey);
973                    }
974                });
975            } finally {
976                indexLock.writeLock().unlock();
977            }
978        }
979
980        @Override
981        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
982                throws Exception {
983            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
984            @SuppressWarnings("unused")
985            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
986            indexLock.writeLock().lock();
987            try {
988                pageFile.tx().execute(new Transaction.Closure<Exception>() {
989                    @Override
990                    public void execute(Transaction tx) throws Exception {
991                        StoredDestination sd = getStoredDestination(dest, tx);
992                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
993                        sd.orderIndex.setBatch(tx, cursorPos);
994                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
995                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
996                                .hasNext();) {
997                            Entry<Long, MessageKeys> entry = iterator.next();
998                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
999                                continue;
1000                            }
1001                            listener.recoverMessage(loadMessage(entry.getValue().location));
1002                        }
1003                        sd.orderIndex.resetCursorPosition();
1004                    }
1005                });
1006            } finally {
1007                indexLock.writeLock().unlock();
1008            }
1009        }
1010
1011        @Override
1012        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
1013                final MessageRecoveryListener listener) throws Exception {
1014            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1015            @SuppressWarnings("unused")
1016            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
1017            indexLock.writeLock().lock();
1018            try {
1019                pageFile.tx().execute(new Transaction.Closure<Exception>() {
1020                    @Override
1021                    public void execute(Transaction tx) throws Exception {
1022                        StoredDestination sd = getStoredDestination(dest, tx);
1023                        sd.orderIndex.resetCursorPosition();
1024                        MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
1025                        if (moc == null) {
1026                            LastAck pos = getLastAck(tx, sd, subscriptionKey);
1027                            if (pos == null) {
1028                                // sub deleted
1029                                return;
1030                            }
1031                            sd.orderIndex.setBatch(tx, pos);
1032                            moc = sd.orderIndex.cursor;
1033                        } else {
1034                            sd.orderIndex.cursor.sync(moc);
1035                        }
1036
1037                        Entry<Long, MessageKeys> entry = null;
1038                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
1039                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
1040                                .hasNext();) {
1041                            entry = iterator.next();
1042                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
1043                                continue;
1044                            }
1045                            if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
1046                                counter++;
1047                            }
1048                            if (counter >= maxReturned || listener.hasSpace() == false) {
1049                                break;
1050                            }
1051                        }
1052                        sd.orderIndex.stoppedIterating();
1053                        if (entry != null) {
1054                            MessageOrderCursor copy = sd.orderIndex.cursor.copy();
1055                            sd.subscriptionCursors.put(subscriptionKey, copy);
1056                        }
1057                    }
1058                });
1059            } finally {
1060                indexLock.writeLock().unlock();
1061            }
1062        }
1063
1064        @Override
1065        public void resetBatching(String clientId, String subscriptionName) {
1066            try {
1067                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1068                indexLock.writeLock().lock();
1069                try {
1070                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1071                        @Override
1072                        public void execute(Transaction tx) throws IOException {
1073                            StoredDestination sd = getStoredDestination(dest, tx);
1074                            sd.subscriptionCursors.remove(subscriptionKey);
1075                        }
1076                    });
1077                }finally {
1078                    indexLock.writeLock().unlock();
1079                }
1080            } catch (IOException e) {
1081                throw new RuntimeException(e);
1082            }
1083        }
1084    }
1085
1086    String subscriptionKey(String clientId, String subscriptionName) {
1087        return clientId + ":" + subscriptionName;
1088    }
1089
1090    @Override
1091    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
1092        String key = key(convert(destination));
1093        MessageStore store = storeCache.get(key(convert(destination)));
1094        if (store == null) {
1095            final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination));
1096            synchronized (storeCache) {
1097                store = storeCache.put(key, queueStore);
1098                if (store != null) {
1099                    storeCache.put(key, store);
1100                } else {
1101                    store = queueStore;
1102                }
1103            }
1104        }
1105
1106        return store;
1107    }
1108
1109    @Override
1110    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
1111        String key = key(convert(destination));
1112        MessageStore store = storeCache.get(key(convert(destination)));
1113        if (store == null) {
1114            final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
1115            synchronized (storeCache) {
1116                store = storeCache.put(key, topicStore);
1117                if (store != null) {
1118                    storeCache.put(key, store);
1119                } else {
1120                    store = topicStore;
1121                }
1122            }
1123        }
1124
1125        return (TopicMessageStore) store;
1126    }
1127
1128    /**
1129     * Cleanup method to remove any state associated with the given destination.
1130     * This method does not stop the message store (it might not be cached).
1131     *
1132     * @param destination
1133     *            Destination to forget
1134     */
1135    @Override
1136    public void removeQueueMessageStore(ActiveMQQueue destination) {
1137    }
1138
1139    /**
1140     * Cleanup method to remove any state associated with the given destination
1141     * This method does not stop the message store (it might not be cached).
1142     *
1143     * @param destination
1144     *            Destination to forget
1145     */
1146    @Override
1147    public void removeTopicMessageStore(ActiveMQTopic destination) {
1148    }
1149
1150    @Override
1151    public void deleteAllMessages() throws IOException {
1152        deleteAllMessages = true;
1153    }
1154
1155    @Override
1156    public Set<ActiveMQDestination> getDestinations() {
1157        try {
1158            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
1159            indexLock.writeLock().lock();
1160            try {
1161                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1162                    @Override
1163                    public void execute(Transaction tx) throws IOException {
1164                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
1165                                .hasNext();) {
1166                            Entry<String, StoredDestination> entry = iterator.next();
1167                            //Removing isEmpty topic check - see AMQ-5875
1168                            rc.add(convert(entry.getKey()));
1169                        }
1170                    }
1171                });
1172            }finally {
1173                indexLock.writeLock().unlock();
1174            }
1175            return rc;
1176        } catch (IOException e) {
1177            throw new RuntimeException(e);
1178        }
1179    }
1180
1181    @Override
1182    public long getLastMessageBrokerSequenceId() throws IOException {
1183        return 0;
1184    }
1185
1186    @Override
1187    public long getLastProducerSequenceId(ProducerId id) {
1188        indexLock.writeLock().lock();
1189        try {
1190            return metadata.producerSequenceIdTracker.getLastSeqId(id);
1191        } finally {
1192            indexLock.writeLock().unlock();
1193        }
1194    }
1195
1196    @Override
1197    public long size() {
1198        try {
1199            return journalSize.get() + getPageFile().getDiskSize();
1200        } catch (IOException e) {
1201            throw new RuntimeException(e);
1202        }
1203    }
1204
1205    @Override
1206    public void beginTransaction(ConnectionContext context) throws IOException {
1207        throw new IOException("Not yet implemented.");
1208    }
1209    @Override
1210    public void commitTransaction(ConnectionContext context) throws IOException {
1211        throw new IOException("Not yet implemented.");
1212    }
1213    @Override
1214    public void rollbackTransaction(ConnectionContext context) throws IOException {
1215        throw new IOException("Not yet implemented.");
1216    }
1217
1218    @Override
1219    public void checkpoint(boolean sync) throws IOException {
1220        super.checkpointCleanup(sync);
1221    }
1222
1223    // /////////////////////////////////////////////////////////////////
1224    // Internal helper methods.
1225    // /////////////////////////////////////////////////////////////////
1226
1227    /**
1228     * @param location
1229     * @return
1230     * @throws IOException
1231     */
1232    Message loadMessage(Location location) throws IOException {
1233        try {
1234            JournalCommand<?> command = load(location);
1235            KahaAddMessageCommand addMessage = null;
1236            switch (command.type()) {
1237                case KAHA_UPDATE_MESSAGE_COMMAND:
1238                    addMessage = ((KahaUpdateMessageCommand) command).getMessage();
1239                    break;
1240                case KAHA_ADD_MESSAGE_COMMAND:
1241                    addMessage = (KahaAddMessageCommand) command;
1242                    break;
1243                default:
1244                    throw new IOException("Could not load journal record, unexpected command type: " + command.type() + " at location: " + location);
1245            }
1246            if (!addMessage.hasMessage()) {
1247                throw new IOException("Could not load journal record, null message content at location: " + location);
1248            }
1249            Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1250            return msg;
1251        } catch (Throwable t) {
1252            IOException ioe = IOExceptionSupport.create("Unexpected error on journal read at: " + location , t);
1253            LOG.error("Failed to load message at: {}", location , ioe);
1254            brokerService.handleIOException(ioe);
1255            throw ioe;
1256        }
1257    }
1258
1259    // /////////////////////////////////////////////////////////////////
1260    // Internal conversion methods.
1261    // /////////////////////////////////////////////////////////////////
1262
1263    KahaLocation convert(Location location) {
1264        KahaLocation rc = new KahaLocation();
1265        rc.setLogId(location.getDataFileId());
1266        rc.setOffset(location.getOffset());
1267        return rc;
1268    }
1269
1270    KahaDestination convert(ActiveMQDestination dest) {
1271        KahaDestination rc = new KahaDestination();
1272        rc.setName(dest.getPhysicalName());
1273        switch (dest.getDestinationType()) {
1274        case ActiveMQDestination.QUEUE_TYPE:
1275            rc.setType(DestinationType.QUEUE);
1276            return rc;
1277        case ActiveMQDestination.TOPIC_TYPE:
1278            rc.setType(DestinationType.TOPIC);
1279            return rc;
1280        case ActiveMQDestination.TEMP_QUEUE_TYPE:
1281            rc.setType(DestinationType.TEMP_QUEUE);
1282            return rc;
1283        case ActiveMQDestination.TEMP_TOPIC_TYPE:
1284            rc.setType(DestinationType.TEMP_TOPIC);
1285            return rc;
1286        default:
1287            return null;
1288        }
1289    }
1290
1291    ActiveMQDestination convert(String dest) {
1292        int p = dest.indexOf(":");
1293        if (p < 0) {
1294            throw new IllegalArgumentException("Not in the valid destination format");
1295        }
1296        int type = Integer.parseInt(dest.substring(0, p));
1297        String name = dest.substring(p + 1);
1298        return convert(type, name);
1299    }
1300
1301    private ActiveMQDestination convert(KahaDestination commandDestination) {
1302        return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1303    }
1304
1305    private ActiveMQDestination convert(int type, String name) {
1306        switch (KahaDestination.DestinationType.valueOf(type)) {
1307        case QUEUE:
1308            return new ActiveMQQueue(name);
1309        case TOPIC:
1310            return new ActiveMQTopic(name);
1311        case TEMP_QUEUE:
1312            return new ActiveMQTempQueue(name);
1313        case TEMP_TOPIC:
1314            return new ActiveMQTempTopic(name);
1315        default:
1316            throw new IllegalArgumentException("Not in the valid destination format");
1317        }
1318    }
1319
1320    public TransactionIdTransformer getTransactionIdTransformer() {
1321        return transactionIdTransformer;
1322    }
1323
1324    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1325        this.transactionIdTransformer = transactionIdTransformer;
1326    }
1327
1328    static class AsyncJobKey {
1329        MessageId id;
1330        ActiveMQDestination destination;
1331
1332        AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1333            this.id = id;
1334            this.destination = destination;
1335        }
1336
1337        @Override
1338        public boolean equals(Object obj) {
1339            if (obj == this) {
1340                return true;
1341            }
1342            return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1343                    && destination.equals(((AsyncJobKey) obj).destination);
1344        }
1345
1346        @Override
1347        public int hashCode() {
1348            return id.hashCode() + destination.hashCode();
1349        }
1350
1351        @Override
1352        public String toString() {
1353            return destination.getPhysicalName() + "-" + id;
1354        }
1355    }
1356
1357    public interface StoreTask {
1358        public boolean cancel();
1359
1360        public void aquireLocks();
1361
1362        public void releaseLocks();
1363    }
1364
1365    class StoreQueueTask implements Runnable, StoreTask {
1366        protected final Message message;
1367        protected final ConnectionContext context;
1368        protected final KahaDBMessageStore store;
1369        protected final InnerFutureTask future;
1370        protected final AtomicBoolean done = new AtomicBoolean();
1371        protected final AtomicBoolean locked = new AtomicBoolean();
1372
1373        public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1374            this.store = store;
1375            this.context = context;
1376            this.message = message;
1377            this.future = new InnerFutureTask(this);
1378        }
1379
1380        public ListenableFuture<Object> getFuture() {
1381            return this.future;
1382        }
1383
1384        @Override
1385        public boolean cancel() {
1386            if (this.done.compareAndSet(false, true)) {
1387                return this.future.cancel(false);
1388            }
1389            return false;
1390        }
1391
1392        @Override
1393        public void aquireLocks() {
1394            if (this.locked.compareAndSet(false, true)) {
1395                try {
1396                    globalQueueSemaphore.acquire();
1397                    store.acquireLocalAsyncLock();
1398                    message.incrementReferenceCount();
1399                } catch (InterruptedException e) {
1400                    LOG.warn("Failed to aquire lock", e);
1401                }
1402            }
1403
1404        }
1405
1406        @Override
1407        public void releaseLocks() {
1408            if (this.locked.compareAndSet(true, false)) {
1409                store.releaseLocalAsyncLock();
1410                globalQueueSemaphore.release();
1411                message.decrementReferenceCount();
1412            }
1413        }
1414
1415        @Override
1416        public void run() {
1417            this.store.doneTasks++;
1418            try {
1419                if (this.done.compareAndSet(false, true)) {
1420                    this.store.addMessage(context, message);
1421                    removeQueueTask(this.store, this.message.getMessageId());
1422                    this.future.complete();
1423                } else if (cancelledTaskModMetric > 0 && (++this.store.canceledTasks) % cancelledTaskModMetric == 0) {
1424                    System.err.println(this.store.dest.getName() + " cancelled: "
1425                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1426                    this.store.canceledTasks = this.store.doneTasks = 0;
1427                }
1428            } catch (Throwable t) {
1429                this.future.setException(t);
1430                removeQueueTask(this.store, this.message.getMessageId());
1431            }
1432        }
1433
1434        protected Message getMessage() {
1435            return this.message;
1436        }
1437
1438        private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object>  {
1439
1440            private final AtomicReference<Runnable> listenerRef = new AtomicReference<>();
1441
1442            public InnerFutureTask(Runnable runnable) {
1443                super(runnable, null);
1444            }
1445
1446            public void setException(final Throwable e) {
1447                super.setException(e);
1448            }
1449
1450            public void complete() {
1451                super.set(null);
1452            }
1453
1454            @Override
1455            public void done() {
1456                fireListener();
1457            }
1458
1459            @Override
1460            public void addListener(Runnable listener) {
1461                this.listenerRef.set(listener);
1462                if (isDone()) {
1463                    fireListener();
1464                }
1465            }
1466
1467            private void fireListener() {
1468                Runnable listener = listenerRef.getAndSet(null);
1469                if (listener != null) {
1470                    try {
1471                        listener.run();
1472                    } catch (Exception ignored) {
1473                        LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored);
1474                    }
1475                }
1476            }
1477        }
1478    }
1479
1480    class StoreTopicTask extends StoreQueueTask {
1481        private final int subscriptionCount;
1482        private final List<String> subscriptionKeys = new ArrayList<String>(1);
1483        private final KahaDBTopicMessageStore topicStore;
1484        public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1485                int subscriptionCount) {
1486            super(store, context, message);
1487            this.topicStore = store;
1488            this.subscriptionCount = subscriptionCount;
1489
1490        }
1491
1492        @Override
1493        public void aquireLocks() {
1494            if (this.locked.compareAndSet(false, true)) {
1495                try {
1496                    globalTopicSemaphore.acquire();
1497                    store.acquireLocalAsyncLock();
1498                    message.incrementReferenceCount();
1499                } catch (InterruptedException e) {
1500                    LOG.warn("Failed to aquire lock", e);
1501                }
1502            }
1503        }
1504
1505        @Override
1506        public void releaseLocks() {
1507            if (this.locked.compareAndSet(true, false)) {
1508                message.decrementReferenceCount();
1509                store.releaseLocalAsyncLock();
1510                globalTopicSemaphore.release();
1511            }
1512        }
1513
1514        /**
1515         * add a key
1516         *
1517         * @param key
1518         * @return true if all acknowledgements received
1519         */
1520        public boolean addSubscriptionKey(String key) {
1521            synchronized (this.subscriptionKeys) {
1522                this.subscriptionKeys.add(key);
1523            }
1524            return this.subscriptionKeys.size() >= this.subscriptionCount;
1525        }
1526
1527        @Override
1528        public void run() {
1529            this.store.doneTasks++;
1530            try {
1531                if (this.done.compareAndSet(false, true)) {
1532                    this.topicStore.addMessage(context, message);
1533                    // apply any acks we have
1534                    synchronized (this.subscriptionKeys) {
1535                        for (String key : this.subscriptionKeys) {
1536                            this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1537
1538                        }
1539                    }
1540                    removeTopicTask(this.topicStore, this.message.getMessageId());
1541                    this.future.complete();
1542                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1543                    System.err.println(this.store.dest.getName() + " cancelled: "
1544                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1545                    this.store.canceledTasks = this.store.doneTasks = 0;
1546                }
1547            } catch (Throwable t) {
1548                this.future.setException(t);
1549                removeTopicTask(this.topicStore, this.message.getMessageId());
1550            }
1551        }
1552    }
1553
1554    public class StoreTaskExecutor extends ThreadPoolExecutor {
1555
1556        public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1557            super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1558        }
1559
1560        @Override
1561        protected void afterExecute(Runnable runnable, Throwable throwable) {
1562            super.afterExecute(runnable, throwable);
1563
1564            if (runnable instanceof StoreTask) {
1565               ((StoreTask)runnable).releaseLocks();
1566            }
1567        }
1568    }
1569
1570    @Override
1571    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
1572        return new JobSchedulerStoreImpl();
1573    }
1574}