001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedHashSet;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Set;
032import java.util.concurrent.BlockingQueue;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.FutureTask;
035import java.util.concurrent.LinkedBlockingQueue;
036import java.util.concurrent.Semaphore;
037import java.util.concurrent.ThreadFactory;
038import java.util.concurrent.ThreadPoolExecutor;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.TimeoutException;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.atomic.AtomicInteger;
043import java.util.concurrent.atomic.AtomicReference;
044
045import org.apache.activemq.broker.ConnectionContext;
046import org.apache.activemq.broker.region.BaseDestination;
047import org.apache.activemq.broker.scheduler.JobSchedulerStore;
048import org.apache.activemq.command.ActiveMQDestination;
049import org.apache.activemq.command.ActiveMQQueue;
050import org.apache.activemq.command.ActiveMQTempQueue;
051import org.apache.activemq.command.ActiveMQTempTopic;
052import org.apache.activemq.command.ActiveMQTopic;
053import org.apache.activemq.command.Message;
054import org.apache.activemq.command.MessageAck;
055import org.apache.activemq.command.MessageId;
056import org.apache.activemq.command.ProducerId;
057import org.apache.activemq.command.SubscriptionInfo;
058import org.apache.activemq.command.TransactionId;
059import org.apache.activemq.openwire.OpenWireFormat;
060import org.apache.activemq.protobuf.Buffer;
061import org.apache.activemq.store.AbstractMessageStore;
062import org.apache.activemq.store.IndexListener;
063import org.apache.activemq.store.ListenableFuture;
064import org.apache.activemq.store.MessageRecoveryListener;
065import org.apache.activemq.store.MessageStore;
066import org.apache.activemq.store.PersistenceAdapter;
067import org.apache.activemq.store.ProxyMessageStore;
068import org.apache.activemq.store.TopicMessageStore;
069import org.apache.activemq.store.TransactionIdTransformer;
070import org.apache.activemq.store.TransactionStore;
071import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
072import org.apache.activemq.store.kahadb.data.KahaDestination;
073import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
074import org.apache.activemq.store.kahadb.data.KahaLocation;
075import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
076import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
077import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
078import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
079import org.apache.activemq.store.kahadb.disk.journal.Location;
080import org.apache.activemq.store.kahadb.disk.page.Transaction;
081import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
082import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
083import org.apache.activemq.usage.MemoryUsage;
084import org.apache.activemq.usage.SystemUsage;
085import org.apache.activemq.util.IOExceptionSupport;
086import org.apache.activemq.util.ServiceStopper;
087import org.apache.activemq.util.ThreadPoolUtils;
088import org.apache.activemq.wireformat.WireFormat;
089import org.slf4j.Logger;
090import org.slf4j.LoggerFactory;
091
092public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
093    static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
094    private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH;
095
096    public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
097    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
098            PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
099    public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
100    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
101            PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
102
103    protected ExecutorService queueExecutor;
104    protected ExecutorService topicExecutor;
105    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
106    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
107    final WireFormat wireFormat = new OpenWireFormat();
108    private SystemUsage usageManager;
109    private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
110    private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
111    Semaphore globalQueueSemaphore;
112    Semaphore globalTopicSemaphore;
113    private boolean concurrentStoreAndDispatchQueues = true;
114    // when true, message order may be compromised when cache is exhausted if store is out
115    // or order w.r.t cache
116    private boolean concurrentStoreAndDispatchTopics = false;
117    private final boolean concurrentStoreAndDispatchTransactions = false;
118    private int maxAsyncJobs = MAX_ASYNC_JOBS;
119    private final KahaDBTransactionStore transactionStore;
120    private TransactionIdTransformer transactionIdTransformer;
121
122    public KahaDBStore() {
123        this.transactionStore = new KahaDBTransactionStore(this);
124        this.transactionIdTransformer = new TransactionIdTransformer() {
125            @Override
126            public TransactionId transform(TransactionId txid) {
127                return txid;
128            }
129        };
130    }
131
132    @Override
133    public String toString() {
134        return "KahaDB:[" + directory.getAbsolutePath() + "]";
135    }
136
137    @Override
138    public void setBrokerName(String brokerName) {
139    }
140
141    @Override
142    public void setUsageManager(SystemUsage usageManager) {
143        this.usageManager = usageManager;
144    }
145
146    public SystemUsage getUsageManager() {
147        return this.usageManager;
148    }
149
150    /**
151     * @return the concurrentStoreAndDispatch
152     */
153    public boolean isConcurrentStoreAndDispatchQueues() {
154        return this.concurrentStoreAndDispatchQueues;
155    }
156
157    /**
158     * @param concurrentStoreAndDispatch
159     *            the concurrentStoreAndDispatch to set
160     */
161    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
162        this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
163    }
164
165    /**
166     * @return the concurrentStoreAndDispatch
167     */
168    public boolean isConcurrentStoreAndDispatchTopics() {
169        return this.concurrentStoreAndDispatchTopics;
170    }
171
172    /**
173     * @param concurrentStoreAndDispatch
174     *            the concurrentStoreAndDispatch to set
175     */
176    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
177        this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
178    }
179
180    public boolean isConcurrentStoreAndDispatchTransactions() {
181        return this.concurrentStoreAndDispatchTransactions;
182    }
183
184    /**
185     * @return the maxAsyncJobs
186     */
187    public int getMaxAsyncJobs() {
188        return this.maxAsyncJobs;
189    }
190
191    /**
192     * @param maxAsyncJobs
193     *            the maxAsyncJobs to set
194     */
195    public void setMaxAsyncJobs(int maxAsyncJobs) {
196        this.maxAsyncJobs = maxAsyncJobs;
197    }
198
199    @Override
200    public void doStart() throws Exception {
201        if (brokerService != null) {
202            metadata.openwireVersion = brokerService.getStoreOpenWireVersion();
203            wireFormat.setVersion(metadata.openwireVersion);
204
205            if (LOG.isDebugEnabled()) {
206                LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion);
207            }
208
209        }
210        super.doStart();
211
212        if (brokerService != null) {
213            // In case the recovered store used a different OpenWire version log a warning
214            // to assist in determining why journal reads fail.
215            if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) {
216                LOG.warn("Recovered Store uses a different OpenWire version[{}] " +
217                         "than the version configured[{}].",
218                         metadata.openwireVersion, brokerService.getStoreOpenWireVersion());
219            }
220        }
221
222        this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
223        this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
224        this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
225        this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
226        this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
227            asyncQueueJobQueue, new ThreadFactory() {
228                @Override
229                public Thread newThread(Runnable runnable) {
230                    Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
231                    thread.setDaemon(true);
232                    return thread;
233                }
234            });
235        this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
236            asyncTopicJobQueue, new ThreadFactory() {
237                @Override
238                public Thread newThread(Runnable runnable) {
239                    Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
240                    thread.setDaemon(true);
241                    return thread;
242                }
243            });
244    }
245
246    @Override
247    public void doStop(ServiceStopper stopper) throws Exception {
248        // drain down async jobs
249        LOG.info("Stopping async queue tasks");
250        if (this.globalQueueSemaphore != null) {
251            this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
252        }
253        synchronized (this.asyncQueueMaps) {
254            for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
255                synchronized (m) {
256                    for (StoreTask task : m.values()) {
257                        task.cancel();
258                    }
259                }
260            }
261            this.asyncQueueMaps.clear();
262        }
263        LOG.info("Stopping async topic tasks");
264        if (this.globalTopicSemaphore != null) {
265            this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
266        }
267        synchronized (this.asyncTopicMaps) {
268            for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
269                synchronized (m) {
270                    for (StoreTask task : m.values()) {
271                        task.cancel();
272                    }
273                }
274            }
275            this.asyncTopicMaps.clear();
276        }
277        if (this.globalQueueSemaphore != null) {
278            this.globalQueueSemaphore.drainPermits();
279        }
280        if (this.globalTopicSemaphore != null) {
281            this.globalTopicSemaphore.drainPermits();
282        }
283        if (this.queueExecutor != null) {
284            ThreadPoolUtils.shutdownNow(queueExecutor);
285            queueExecutor = null;
286        }
287        if (this.topicExecutor != null) {
288            ThreadPoolUtils.shutdownNow(topicExecutor);
289            topicExecutor = null;
290        }
291        LOG.info("Stopped KahaDB");
292        super.doStop(stopper);
293    }
294
295    private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
296        return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
297            @Override
298            public Location execute(Transaction tx) throws IOException {
299                StoredDestination sd = getStoredDestination(destination, tx);
300                Long sequence = sd.messageIdIndex.get(tx, key);
301                if (sequence == null) {
302                    return null;
303                }
304                return sd.orderIndex.get(tx, sequence).location;
305            }
306        });
307    }
308
309    protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
310        StoreQueueTask task = null;
311        synchronized (store.asyncTaskMap) {
312            task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
313        }
314        return task;
315    }
316
317    // with asyncTaskMap locked
318    protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
319        store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
320        this.queueExecutor.execute(task);
321    }
322
323    protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
324        StoreTopicTask task = null;
325        synchronized (store.asyncTaskMap) {
326            task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
327        }
328        return task;
329    }
330
331    protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
332        synchronized (store.asyncTaskMap) {
333            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
334        }
335        this.topicExecutor.execute(task);
336    }
337
338    @Override
339    public TransactionStore createTransactionStore() throws IOException {
340        return this.transactionStore;
341    }
342
343    public boolean getForceRecoverIndex() {
344        return this.forceRecoverIndex;
345    }
346
347    public void setForceRecoverIndex(boolean forceRecoverIndex) {
348        this.forceRecoverIndex = forceRecoverIndex;
349    }
350
351    public void forgetRecoveredAcks(ArrayList<MessageAck> preparedAcks, boolean isRollback) throws IOException {
352        if (preparedAcks != null) {
353            Map<ActiveMQDestination, KahaDBMessageStore> stores = new HashMap<>();
354            for (MessageAck ack : preparedAcks) {
355                stores.put(ack.getDestination(), findMatchingStore(ack.getDestination()));
356            }
357            ArrayList<MessageAck> perStoreAcks = new ArrayList<>();
358            for (Entry<ActiveMQDestination, KahaDBMessageStore> entry : stores.entrySet()) {
359                for (MessageAck ack : preparedAcks) {
360                    if (entry.getKey().equals(ack.getDestination())) {
361                        perStoreAcks.add(ack);
362                    }
363                }
364                entry.getValue().forgetRecoveredAcks(perStoreAcks, isRollback);
365                perStoreAcks.clear();
366            }
367        }
368    }
369
370    public void trackRecoveredAcks(ArrayList<MessageAck> preparedAcks) throws IOException {
371        Map<ActiveMQDestination, KahaDBMessageStore> stores = new HashMap<>();
372        for (MessageAck ack : preparedAcks) {
373            stores.put(ack.getDestination(), findMatchingStore(ack.getDestination()));
374        }
375        ArrayList<MessageAck> perStoreAcks = new ArrayList<>();
376        for (Entry<ActiveMQDestination, KahaDBMessageStore> entry : stores.entrySet()) {
377            for (MessageAck ack : preparedAcks) {
378                if (entry.getKey().equals(ack.getDestination())) {
379                    perStoreAcks.add(ack);
380                }
381            }
382            entry.getValue().trackRecoveredAcks(perStoreAcks);
383            perStoreAcks.clear();
384        }
385    }
386
387    private KahaDBMessageStore findMatchingStore(ActiveMQDestination activeMQDestination) throws IOException {
388        ProxyMessageStore store = (ProxyMessageStore) storeCache.get(convert(activeMQDestination));
389        if (store == null) {
390            if (activeMQDestination.isQueue()) {
391                store = (ProxyMessageStore) createQueueMessageStore((ActiveMQQueue) activeMQDestination);
392            } else {
393                store = (ProxyMessageStore) createTopicMessageStore((ActiveMQTopic) activeMQDestination);
394            }
395        }
396        return (KahaDBMessageStore) store.getDelegate();
397    }
398
399    public class KahaDBMessageStore extends AbstractMessageStore {
400        protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
401        protected KahaDestination dest;
402        private final int maxAsyncJobs;
403        private final Semaphore localDestinationSemaphore;
404        protected final HashMap<String, Set<String>> ackedAndPreparedMap = new HashMap<String, Set<String>>();
405        protected final HashMap<String, Set<String>> rolledBackAcksMap = new HashMap<String, Set<String>>();
406
407        double doneTasks, canceledTasks = 0;
408
409        public KahaDBMessageStore(ActiveMQDestination destination) {
410            super(destination);
411            this.dest = convert(destination);
412            this.maxAsyncJobs = getMaxAsyncJobs();
413            this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
414        }
415
416        @Override
417        public ActiveMQDestination getDestination() {
418            return destination;
419        }
420
421
422        private final String recoveredTxStateMapKey(ActiveMQDestination destination, MessageAck ack) {
423            return destination.isQueue() ? destination.getPhysicalName() : ack.getConsumerId().getConnectionId();
424        }
425
426        // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
427        // till then they are skipped by the store.
428        // 'at most once' XA guarantee
429        public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
430            indexLock.writeLock().lock();
431            try {
432                for (MessageAck ack : acks) {
433                    final String key = recoveredTxStateMapKey(destination, ack);
434                    Set ackedAndPrepared = ackedAndPreparedMap.get(key);
435                    if (ackedAndPrepared == null) {
436                        ackedAndPrepared = new LinkedHashSet<String>();
437                        ackedAndPreparedMap.put(key, ackedAndPrepared);
438                    }
439                    ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
440                }
441            } finally {
442                indexLock.writeLock().unlock();
443            }
444        }
445
446        public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
447            if (acks != null) {
448                indexLock.writeLock().lock();
449                try {
450                    for (MessageAck ack : acks) {
451                        final String id = ack.getLastMessageId().toProducerKey();
452                        final String key = recoveredTxStateMapKey(destination, ack);
453                        Set ackedAndPrepared = ackedAndPreparedMap.get(key);
454                        if (ackedAndPrepared != null) {
455                            ackedAndPrepared.remove(id);
456                            if (ackedAndPreparedMap.isEmpty()) {
457                                ackedAndPreparedMap.remove(key);
458                            }
459                        }
460                        if (rollback) {
461                            Set rolledBackAcks = rolledBackAcksMap.get(key);
462                            if (rolledBackAcks == null) {
463                                rolledBackAcks = new LinkedHashSet<String>();
464                                rolledBackAcksMap.put(key, rolledBackAcks);
465                            }
466                            rolledBackAcks.add(id);
467                            //incrementAndAddSizeToStoreStat(dest, 0);
468                        }
469                    }
470                } finally {
471                    indexLock.writeLock().unlock();
472                }
473            }
474        }
475
476        @Override
477        public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
478                throws IOException {
479            if (isConcurrentStoreAndDispatchQueues()) {
480                message.beforeMarshall(wireFormat);
481                StoreQueueTask result = new StoreQueueTask(this, context, message);
482                ListenableFuture<Object> future = result.getFuture();
483                message.getMessageId().setFutureOrSequenceLong(future);
484                message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch
485                result.aquireLocks();
486                synchronized (asyncTaskMap) {
487                    addQueueTask(this, result);
488                    if (indexListener != null) {
489                        indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
490                    }
491                }
492                return future;
493            } else {
494                return super.asyncAddQueueMessage(context, message);
495            }
496        }
497
498        @Override
499        public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
500            if (isConcurrentStoreAndDispatchQueues()) {
501                AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
502                StoreQueueTask task = null;
503                synchronized (asyncTaskMap) {
504                    task = (StoreQueueTask) asyncTaskMap.get(key);
505                }
506                if (task != null) {
507                    if (ack.isInTransaction() || !task.cancel()) {
508                        try {
509                            task.future.get();
510                        } catch (InterruptedException e) {
511                            throw new InterruptedIOException(e.toString());
512                        } catch (Exception ignored) {
513                            LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
514                        }
515                        removeMessage(context, ack);
516                    } else {
517                        indexLock.writeLock().lock();
518                        try {
519                            metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId());
520                        } finally {
521                            indexLock.writeLock().unlock();
522                        }
523                        synchronized (asyncTaskMap) {
524                            asyncTaskMap.remove(key);
525                        }
526                    }
527                } else {
528                    removeMessage(context, ack);
529                }
530            } else {
531                removeMessage(context, ack);
532            }
533        }
534
535        @Override
536        public void addMessage(final ConnectionContext context, final Message message) throws IOException {
537            final KahaAddMessageCommand command = new KahaAddMessageCommand();
538            command.setDestination(dest);
539            command.setMessageId(message.getMessageId().toProducerKey());
540            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
541            command.setPriority(message.getPriority());
542            command.setPrioritySupported(isPrioritizedMessages());
543            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
544            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
545            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() {
546                // sync add? (for async, future present from getFutureOrSequenceLong)
547                Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
548
549                @Override
550                public void sequenceAssignedWithIndexLocked(final long sequence) {
551                    message.getMessageId().setFutureOrSequenceLong(sequence);
552                    if (indexListener != null) {
553                        if (possibleFuture == null) {
554                            trackPendingAdd(dest, sequence);
555                            indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
556                                @Override
557                                public void run() {
558                                    trackPendingAddComplete(dest, sequence);
559                                }
560                            }));
561                        }
562                    }
563                }
564            }, null);
565        }
566
567        @Override
568        public void updateMessage(Message message) throws IOException {
569            if (LOG.isTraceEnabled()) {
570                LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
571            }
572            KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
573            KahaAddMessageCommand command = new KahaAddMessageCommand();
574            command.setDestination(dest);
575            command.setMessageId(message.getMessageId().toProducerKey());
576            command.setPriority(message.getPriority());
577            command.setPrioritySupported(prioritizedMessages);
578            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
579            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
580            updateMessageCommand.setMessage(command);
581            store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null);
582        }
583
584        @Override
585        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
586            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
587            command.setDestination(dest);
588            command.setMessageId(ack.getLastMessageId().toProducerKey());
589            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())));
590
591            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
592            command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
593            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
594        }
595
596        @Override
597        public void removeAllMessages(ConnectionContext context) throws IOException {
598            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
599            command.setDestination(dest);
600            store(command, true, null, null);
601        }
602
603        @Override
604        public Message getMessage(MessageId identity) throws IOException {
605            final String key = identity.toProducerKey();
606
607            // Hopefully one day the page file supports concurrent read
608            // operations... but for now we must
609            // externally synchronize...
610            Location location;
611            indexLock.writeLock().lock();
612            try {
613                location = findMessageLocation(key, dest);
614            } finally {
615                indexLock.writeLock().unlock();
616            }
617            if (location == null) {
618                return null;
619            }
620
621            return loadMessage(location);
622        }
623
624        @Override
625        public int getMessageCount() throws IOException {
626            try {
627                lockAsyncJobQueue();
628                indexLock.writeLock().lock();
629                try {
630                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
631                        @Override
632                        public Integer execute(Transaction tx) throws IOException {
633                            // Iterate through all index entries to get a count
634                            // of messages in the destination.
635                            StoredDestination sd = getStoredDestination(dest, tx);
636                            int rc = 0;
637                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
638                                iterator.next();
639                                rc++;
640                            }
641                            Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
642                            if (ackedAndPrepared != null) {
643                                rc = rc - ackedAndPrepared.size();
644                            }
645                            return rc;
646                        }
647                    });
648                } finally {
649                    indexLock.writeLock().unlock();
650                }
651            } finally {
652                unlockAsyncJobQueue();
653            }
654        }
655
656        @Override
657        public boolean isEmpty() throws IOException {
658            indexLock.writeLock().lock();
659            try {
660                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
661                    @Override
662                    public Boolean execute(Transaction tx) throws IOException {
663                        // Iterate through all index entries to get a count of
664                        // messages in the destination.
665                        StoredDestination sd = getStoredDestination(dest, tx);
666                        return sd.locationIndex.isEmpty(tx);
667                    }
668                });
669            } finally {
670                indexLock.writeLock().unlock();
671            }
672        }
673
674        @Override
675        public void recover(final MessageRecoveryListener listener) throws Exception {
676            // recovery may involve expiry which will modify
677            indexLock.writeLock().lock();
678            try {
679                pageFile.tx().execute(new Transaction.Closure<Exception>() {
680                    @Override
681                    public void execute(Transaction tx) throws Exception {
682                        StoredDestination sd = getStoredDestination(dest, tx);
683                        recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, Integer.MAX_VALUE, listener);
684                        sd.orderIndex.resetCursorPosition();
685                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
686                                .hasNext(); ) {
687                            Entry<Long, MessageKeys> entry = iterator.next();
688                            Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
689                            if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
690                                continue;
691                            }
692                            Message msg = loadMessage(entry.getValue().location);
693                            listener.recoverMessage(msg);
694                        }
695                    }
696                });
697            } finally {
698                indexLock.writeLock().unlock();
699            }
700        }
701
702        @Override
703        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
704            indexLock.writeLock().lock();
705            try {
706                pageFile.tx().execute(new Transaction.Closure<Exception>() {
707                    @Override
708                    public void execute(Transaction tx) throws Exception {
709                        StoredDestination sd = getStoredDestination(dest, tx);
710                        Entry<Long, MessageKeys> entry = null;
711                        int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener);
712                        Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
713                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
714                            entry = iterator.next();
715                            if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
716                                continue;
717                            }
718                            Message msg = loadMessage(entry.getValue().location);
719                            msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
720                            listener.recoverMessage(msg);
721                            counter++;
722                            if (counter >= maxReturned || !listener.canRecoveryNextMessage()) {
723                                break;
724                            }
725                        }
726                        sd.orderIndex.stoppedIterating();
727                    }
728                });
729            } finally {
730                indexLock.writeLock().unlock();
731            }
732        }
733
734        protected int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
735            int counter = 0;
736            String id;
737
738            Set rolledBackAcks = rolledBackAcksMap.get(recoveredTxStateMapKey);
739            if (rolledBackAcks == null) {
740                return counter;
741            }
742            for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) {
743                id = iterator.next();
744                iterator.remove();
745                Long sequence = sd.messageIdIndex.get(tx, id);
746                if (sequence != null) {
747                    if (sd.orderIndex.alreadyDispatched(sequence)) {
748                        listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location));
749                        counter++;
750                        if (counter >= maxReturned) {
751                            break;
752                        }
753                    } else {
754                        LOG.debug("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
755                    }
756                } else {
757                    LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd);
758                }
759            }
760            if (rolledBackAcks.isEmpty()) {
761                rolledBackAcksMap.remove(recoveredTxStateMapKey);
762            }
763            return counter;
764        }
765
766
767        @Override
768        public void resetBatching() {
769            if (pageFile.isLoaded()) {
770                indexLock.writeLock().lock();
771                try {
772                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
773                        @Override
774                        public void execute(Transaction tx) throws Exception {
775                            StoredDestination sd = getExistingStoredDestination(dest, tx);
776                            if (sd != null) {
777                                sd.orderIndex.resetCursorPosition();}
778                            }
779                        });
780                } catch (Exception e) {
781                    LOG.error("Failed to reset batching",e);
782                } finally {
783                    indexLock.writeLock().unlock();
784                }
785            }
786        }
787
788        @Override
789        public void setBatch(final MessageId identity) throws IOException {
790            indexLock.writeLock().lock();
791            try {
792                pageFile.tx().execute(new Transaction.Closure<IOException>() {
793                    @Override
794                    public void execute(Transaction tx) throws IOException {
795                        StoredDestination sd = getStoredDestination(dest, tx);
796                        Long location = (Long) identity.getFutureOrSequenceLong();
797                        Long pending = sd.orderIndex.minPendingAdd();
798                        if (pending != null) {
799                            location = Math.min(location, pending-1);
800                        }
801                        sd.orderIndex.setBatch(tx, location);
802                    }
803                });
804            } finally {
805                indexLock.writeLock().unlock();
806            }
807        }
808
809        @Override
810        public void setMemoryUsage(MemoryUsage memoryUsage) {
811        }
812        @Override
813        public void start() throws Exception {
814            super.start();
815            // exercise the store to ensure creation
816            isEmpty();
817        }
818        @Override
819        public void stop() throws Exception {
820            super.stop();
821        }
822
823        protected void lockAsyncJobQueue() {
824            try {
825                if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) {
826                    throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore);
827                }
828            } catch (Exception e) {
829                LOG.error("Failed to lock async jobs for " + this.destination, e);
830            }
831        }
832
833        protected void unlockAsyncJobQueue() {
834            this.localDestinationSemaphore.release(this.maxAsyncJobs);
835        }
836
837        protected void acquireLocalAsyncLock() {
838            try {
839                this.localDestinationSemaphore.acquire();
840            } catch (InterruptedException e) {
841                LOG.error("Failed to aquire async lock for " + this.destination, e);
842            }
843        }
844
845        protected void releaseLocalAsyncLock() {
846            this.localDestinationSemaphore.release();
847        }
848
849        @Override
850        public String toString(){
851            return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
852        }
853    }
854
855    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
856        private final AtomicInteger subscriptionCount = new AtomicInteger();
857        public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
858            super(destination);
859            this.subscriptionCount.set(getAllSubscriptions().length);
860            if (isConcurrentStoreAndDispatchTopics()) {
861                asyncTopicMaps.add(asyncTaskMap);
862            }
863        }
864
865        @Override
866        public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
867                throws IOException {
868            if (isConcurrentStoreAndDispatchTopics()) {
869                message.beforeMarshall(wireFormat);
870                StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
871                result.aquireLocks();
872                addTopicTask(this, result);
873                return result.getFuture();
874            } else {
875                return super.asyncAddTopicMessage(context, message);
876            }
877        }
878
879        @Override
880        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
881                                MessageId messageId, MessageAck ack) throws IOException {
882            String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
883            if (isConcurrentStoreAndDispatchTopics()) {
884                AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
885                StoreTopicTask task = null;
886                synchronized (asyncTaskMap) {
887                    task = (StoreTopicTask) asyncTaskMap.get(key);
888                }
889                if (task != null) {
890                    if (task.addSubscriptionKey(subscriptionKey)) {
891                        removeTopicTask(this, messageId);
892                        if (task.cancel()) {
893                            synchronized (asyncTaskMap) {
894                                asyncTaskMap.remove(key);
895                            }
896                        }
897                    }
898                } else {
899                    doAcknowledge(context, subscriptionKey, messageId, ack);
900                }
901            } else {
902                doAcknowledge(context, subscriptionKey, messageId, ack);
903            }
904        }
905
906        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
907                throws IOException {
908            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
909            command.setDestination(dest);
910            command.setSubscriptionKey(subscriptionKey);
911            command.setMessageId(messageId.toProducerKey());
912            command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null);
913            if (ack != null && ack.isUnmatchedAck()) {
914                command.setAck(UNMATCHED);
915            } else {
916                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
917                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
918            }
919            store(command, false, null, null);
920        }
921
922        @Override
923        public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
924            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
925                    .getSubscriptionName());
926            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
927            command.setDestination(dest);
928            command.setSubscriptionKey(subscriptionKey.toString());
929            command.setRetroactive(retroactive);
930            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
931            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
932            store(command, isEnableJournalDiskSyncs() && true, null, null);
933            this.subscriptionCount.incrementAndGet();
934        }
935
936        @Override
937        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
938            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
939            command.setDestination(dest);
940            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
941            store(command, isEnableJournalDiskSyncs() && true, null, null);
942            this.subscriptionCount.decrementAndGet();
943        }
944
945        @Override
946        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
947
948            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
949            indexLock.writeLock().lock();
950            try {
951                pageFile.tx().execute(new Transaction.Closure<IOException>() {
952                    @Override
953                    public void execute(Transaction tx) throws IOException {
954                        StoredDestination sd = getStoredDestination(dest, tx);
955                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
956                                .hasNext();) {
957                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
958                            SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
959                                    .getValue().getSubscriptionInfo().newInput()));
960                            subscriptions.add(info);
961
962                        }
963                    }
964                });
965            } finally {
966                indexLock.writeLock().unlock();
967            }
968
969            SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
970            subscriptions.toArray(rc);
971            return rc;
972        }
973
974        @Override
975        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
976            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
977            indexLock.writeLock().lock();
978            try {
979                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
980                    @Override
981                    public SubscriptionInfo execute(Transaction tx) throws IOException {
982                        StoredDestination sd = getStoredDestination(dest, tx);
983                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
984                        if (command == null) {
985                            return null;
986                        }
987                        return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
988                                .getSubscriptionInfo().newInput()));
989                    }
990                });
991            } finally {
992                indexLock.writeLock().unlock();
993            }
994        }
995
996        @Override
997        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
998            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
999            indexLock.writeLock().lock();
1000            try {
1001                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
1002                    @Override
1003                    public Integer execute(Transaction tx) throws IOException {
1004                        StoredDestination sd = getStoredDestination(dest, tx);
1005                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
1006                        if (cursorPos == null) {
1007                            // The subscription might not exist.
1008                            return 0;
1009                        }
1010
1011                        return (int) getStoredMessageCount(tx, sd, subscriptionKey);
1012                    }
1013                });
1014            } finally {
1015                indexLock.writeLock().unlock();
1016            }
1017        }
1018
1019        @Override
1020        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
1021                throws Exception {
1022            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1023            @SuppressWarnings("unused")
1024            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
1025            indexLock.writeLock().lock();
1026            try {
1027                pageFile.tx().execute(new Transaction.Closure<Exception>() {
1028                    @Override
1029                    public void execute(Transaction tx) throws Exception {
1030                        StoredDestination sd = getStoredDestination(dest, tx);
1031                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
1032                        SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey);
1033                        //If we have ackPositions tracked then compare the first one as individual acknowledge mode
1034                        //may have bumped lastAck even though there are earlier messages to still consume
1035                        if (subAckPositions != null && !subAckPositions.isEmpty()
1036                                && subAckPositions.getHead().getFirst() < cursorPos.lastAckedSequence) {
1037                            //we have messages to ack before lastAckedSequence
1038                            sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1);
1039                        } else {
1040                            subAckPositions = null;
1041                            sd.orderIndex.setBatch(tx, cursorPos);
1042                        }
1043                        recoverRolledBackAcks(subscriptionKey, sd, tx, Integer.MAX_VALUE, listener);
1044                        Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey);
1045                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
1046                                .hasNext();) {
1047                            Entry<Long, MessageKeys> entry = iterator.next();
1048                            if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
1049                                continue;
1050                            }
1051                            //If subAckPositions is set then verify the sequence set contains the message still
1052                            //and if it doesn't skip it
1053                            if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) {
1054                                continue;
1055                            }
1056                            listener.recoverMessage(loadMessage(entry.getValue().location));
1057                        }
1058                        sd.orderIndex.resetCursorPosition();
1059                    }
1060                });
1061            } finally {
1062                indexLock.writeLock().unlock();
1063            }
1064        }
1065
1066        @Override
1067        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
1068                final MessageRecoveryListener listener) throws Exception {
1069            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1070            @SuppressWarnings("unused")
1071            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
1072            indexLock.writeLock().lock();
1073            try {
1074                pageFile.tx().execute(new Transaction.Closure<Exception>() {
1075                    @Override
1076                    public void execute(Transaction tx) throws Exception {
1077                        StoredDestination sd = getStoredDestination(dest, tx);
1078                        sd.orderIndex.resetCursorPosition();
1079                        MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
1080                        SequenceSet subAckPositions = null;
1081                        if (moc == null) {
1082                            LastAck pos = getLastAck(tx, sd, subscriptionKey);
1083                            if (pos == null) {
1084                                // sub deleted
1085                                return;
1086                            }
1087                            subAckPositions = getSequenceSet(tx, sd, subscriptionKey);
1088                            //If we have ackPositions tracked then compare the first one as individual acknowledge mode
1089                            //may have bumped lastAck even though there are earlier messages to still consume
1090                            if (subAckPositions != null && !subAckPositions.isEmpty()
1091                                    && subAckPositions.getHead().getFirst() < pos.lastAckedSequence) {
1092                                //we have messages to ack before lastAckedSequence
1093                                sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1);
1094                            } else {
1095                                subAckPositions = null;
1096                                sd.orderIndex.setBatch(tx, pos);
1097                            }
1098                            moc = sd.orderIndex.cursor;
1099                        } else {
1100                            sd.orderIndex.cursor.sync(moc);
1101                        }
1102
1103                        Entry<Long, MessageKeys> entry = null;
1104                        int counter = recoverRolledBackAcks(subscriptionKey, sd, tx, maxReturned, listener);
1105                        Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey);
1106                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
1107                                .hasNext();) {
1108                            entry = iterator.next();
1109                            if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
1110                                continue;
1111                            }
1112                            //If subAckPositions is set then verify the sequence set contains the message still
1113                            //and if it doesn't skip it
1114                            if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) {
1115                                continue;
1116                            }
1117                            if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
1118                                counter++;
1119                            }
1120                            if (counter >= maxReturned || listener.hasSpace() == false) {
1121                                break;
1122                            }
1123                        }
1124                        sd.orderIndex.stoppedIterating();
1125                        if (entry != null) {
1126                            MessageOrderCursor copy = sd.orderIndex.cursor.copy();
1127                            sd.subscriptionCursors.put(subscriptionKey, copy);
1128                        }
1129                    }
1130                });
1131            } finally {
1132                indexLock.writeLock().unlock();
1133            }
1134        }
1135
1136        @Override
1137        public void resetBatching(String clientId, String subscriptionName) {
1138            try {
1139                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
1140                indexLock.writeLock().lock();
1141                try {
1142                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1143                        @Override
1144                        public void execute(Transaction tx) throws IOException {
1145                            StoredDestination sd = getStoredDestination(dest, tx);
1146                            sd.subscriptionCursors.remove(subscriptionKey);
1147                        }
1148                    });
1149                }finally {
1150                    indexLock.writeLock().unlock();
1151                }
1152            } catch (IOException e) {
1153                throw new RuntimeException(e);
1154            }
1155        }
1156    }
1157
1158    String subscriptionKey(String clientId, String subscriptionName) {
1159        return clientId + ":" + subscriptionName;
1160    }
1161
1162    @Override
1163    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
1164        String key = key(convert(destination));
1165        MessageStore store = storeCache.get(key(convert(destination)));
1166        if (store == null) {
1167            final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination));
1168            synchronized (storeCache) {
1169                store = storeCache.put(key, queueStore);
1170                if (store != null) {
1171                    storeCache.put(key, store);
1172                } else {
1173                    store = queueStore;
1174                }
1175            }
1176        }
1177
1178        return store;
1179    }
1180
1181    @Override
1182    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
1183        String key = key(convert(destination));
1184        MessageStore store = storeCache.get(key(convert(destination)));
1185        if (store == null) {
1186            final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
1187            synchronized (storeCache) {
1188                store = storeCache.put(key, topicStore);
1189                if (store != null) {
1190                    storeCache.put(key, store);
1191                } else {
1192                    store = topicStore;
1193                }
1194            }
1195        }
1196
1197        return (TopicMessageStore) store;
1198    }
1199
1200    /**
1201     * Cleanup method to remove any state associated with the given destination.
1202     * This method does not stop the message store (it might not be cached).
1203     *
1204     * @param destination
1205     *            Destination to forget
1206     */
1207    @Override
1208    public void removeQueueMessageStore(ActiveMQQueue destination) {
1209    }
1210
1211    /**
1212     * Cleanup method to remove any state associated with the given destination
1213     * This method does not stop the message store (it might not be cached).
1214     *
1215     * @param destination
1216     *            Destination to forget
1217     */
1218    @Override
1219    public void removeTopicMessageStore(ActiveMQTopic destination) {
1220    }
1221
1222    @Override
1223    public void deleteAllMessages() throws IOException {
1224        deleteAllMessages = true;
1225    }
1226
1227    @Override
1228    public Set<ActiveMQDestination> getDestinations() {
1229        try {
1230            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
1231            indexLock.writeLock().lock();
1232            try {
1233                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1234                    @Override
1235                    public void execute(Transaction tx) throws IOException {
1236                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
1237                                .hasNext();) {
1238                            Entry<String, StoredDestination> entry = iterator.next();
1239                            //Removing isEmpty topic check - see AMQ-5875
1240                            rc.add(convert(entry.getKey()));
1241                        }
1242                    }
1243                });
1244            }finally {
1245                indexLock.writeLock().unlock();
1246            }
1247            return rc;
1248        } catch (IOException e) {
1249            throw new RuntimeException(e);
1250        }
1251    }
1252
1253    @Override
1254    public long getLastMessageBrokerSequenceId() throws IOException {
1255        return 0;
1256    }
1257
1258    @Override
1259    public long getLastProducerSequenceId(ProducerId id) {
1260        indexLock.writeLock().lock();
1261        try {
1262            return metadata.producerSequenceIdTracker.getLastSeqId(id);
1263        } finally {
1264            indexLock.writeLock().unlock();
1265        }
1266    }
1267
1268    @Override
1269    public long size() {
1270        try {
1271            return journalSize.get() + getPageFile().getDiskSize();
1272        } catch (IOException e) {
1273            throw new RuntimeException(e);
1274        }
1275    }
1276
1277    @Override
1278    public void beginTransaction(ConnectionContext context) throws IOException {
1279        throw new IOException("Not yet implemented.");
1280    }
1281    @Override
1282    public void commitTransaction(ConnectionContext context) throws IOException {
1283        throw new IOException("Not yet implemented.");
1284    }
1285    @Override
1286    public void rollbackTransaction(ConnectionContext context) throws IOException {
1287        throw new IOException("Not yet implemented.");
1288    }
1289
1290    @Override
1291    public void checkpoint(boolean sync) throws IOException {
1292        super.checkpointCleanup(sync);
1293    }
1294
1295    // /////////////////////////////////////////////////////////////////
1296    // Internal helper methods.
1297    // /////////////////////////////////////////////////////////////////
1298
1299    /**
1300     * @param location
1301     * @return
1302     * @throws IOException
1303     */
1304    Message loadMessage(Location location) throws IOException {
1305        try {
1306            JournalCommand<?> command = load(location);
1307            KahaAddMessageCommand addMessage = null;
1308            switch (command.type()) {
1309                case KAHA_UPDATE_MESSAGE_COMMAND:
1310                    addMessage = ((KahaUpdateMessageCommand) command).getMessage();
1311                    break;
1312                case KAHA_ADD_MESSAGE_COMMAND:
1313                    addMessage = (KahaAddMessageCommand) command;
1314                    break;
1315                default:
1316                    throw new IOException("Could not load journal record, unexpected command type: " + command.type() + " at location: " + location);
1317            }
1318            if (!addMessage.hasMessage()) {
1319                throw new IOException("Could not load journal record, null message content at location: " + location);
1320            }
1321            Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1322            return msg;
1323        } catch (Throwable t) {
1324            IOException ioe = IOExceptionSupport.create("Unexpected error on journal read at: " + location , t);
1325            LOG.error("Failed to load message at: {}", location , ioe);
1326            brokerService.handleIOException(ioe);
1327            throw ioe;
1328        }
1329    }
1330
1331    // /////////////////////////////////////////////////////////////////
1332    // Internal conversion methods.
1333    // /////////////////////////////////////////////////////////////////
1334
1335    KahaLocation convert(Location location) {
1336        KahaLocation rc = new KahaLocation();
1337        rc.setLogId(location.getDataFileId());
1338        rc.setOffset(location.getOffset());
1339        return rc;
1340    }
1341
1342    KahaDestination convert(ActiveMQDestination dest) {
1343        KahaDestination rc = new KahaDestination();
1344        rc.setName(dest.getPhysicalName());
1345        switch (dest.getDestinationType()) {
1346        case ActiveMQDestination.QUEUE_TYPE:
1347            rc.setType(DestinationType.QUEUE);
1348            return rc;
1349        case ActiveMQDestination.TOPIC_TYPE:
1350            rc.setType(DestinationType.TOPIC);
1351            return rc;
1352        case ActiveMQDestination.TEMP_QUEUE_TYPE:
1353            rc.setType(DestinationType.TEMP_QUEUE);
1354            return rc;
1355        case ActiveMQDestination.TEMP_TOPIC_TYPE:
1356            rc.setType(DestinationType.TEMP_TOPIC);
1357            return rc;
1358        default:
1359            return null;
1360        }
1361    }
1362
1363    ActiveMQDestination convert(String dest) {
1364        int p = dest.indexOf(":");
1365        if (p < 0) {
1366            throw new IllegalArgumentException("Not in the valid destination format");
1367        }
1368        int type = Integer.parseInt(dest.substring(0, p));
1369        String name = dest.substring(p + 1);
1370        return convert(type, name);
1371    }
1372
1373    private ActiveMQDestination convert(KahaDestination commandDestination) {
1374        return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1375    }
1376
1377    private ActiveMQDestination convert(int type, String name) {
1378        switch (KahaDestination.DestinationType.valueOf(type)) {
1379        case QUEUE:
1380            return new ActiveMQQueue(name);
1381        case TOPIC:
1382            return new ActiveMQTopic(name);
1383        case TEMP_QUEUE:
1384            return new ActiveMQTempQueue(name);
1385        case TEMP_TOPIC:
1386            return new ActiveMQTempTopic(name);
1387        default:
1388            throw new IllegalArgumentException("Not in the valid destination format");
1389        }
1390    }
1391
1392    public TransactionIdTransformer getTransactionIdTransformer() {
1393        return transactionIdTransformer;
1394    }
1395
1396    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1397        this.transactionIdTransformer = transactionIdTransformer;
1398    }
1399
1400    static class AsyncJobKey {
1401        MessageId id;
1402        ActiveMQDestination destination;
1403
1404        AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1405            this.id = id;
1406            this.destination = destination;
1407        }
1408
1409        @Override
1410        public boolean equals(Object obj) {
1411            if (obj == this) {
1412                return true;
1413            }
1414            return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1415                    && destination.equals(((AsyncJobKey) obj).destination);
1416        }
1417
1418        @Override
1419        public int hashCode() {
1420            return id.hashCode() + destination.hashCode();
1421        }
1422
1423        @Override
1424        public String toString() {
1425            return destination.getPhysicalName() + "-" + id;
1426        }
1427    }
1428
1429    public interface StoreTask {
1430        public boolean cancel();
1431
1432        public void aquireLocks();
1433
1434        public void releaseLocks();
1435    }
1436
1437    class StoreQueueTask implements Runnable, StoreTask {
1438        protected final Message message;
1439        protected final ConnectionContext context;
1440        protected final KahaDBMessageStore store;
1441        protected final InnerFutureTask future;
1442        protected final AtomicBoolean done = new AtomicBoolean();
1443        protected final AtomicBoolean locked = new AtomicBoolean();
1444
1445        public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1446            this.store = store;
1447            this.context = context;
1448            this.message = message;
1449            this.future = new InnerFutureTask(this);
1450        }
1451
1452        public ListenableFuture<Object> getFuture() {
1453            return this.future;
1454        }
1455
1456        @Override
1457        public boolean cancel() {
1458            if (this.done.compareAndSet(false, true)) {
1459                return this.future.cancel(false);
1460            }
1461            return false;
1462        }
1463
1464        @Override
1465        public void aquireLocks() {
1466            if (this.locked.compareAndSet(false, true)) {
1467                try {
1468                    globalQueueSemaphore.acquire();
1469                    store.acquireLocalAsyncLock();
1470                    message.incrementReferenceCount();
1471                } catch (InterruptedException e) {
1472                    LOG.warn("Failed to aquire lock", e);
1473                }
1474            }
1475
1476        }
1477
1478        @Override
1479        public void releaseLocks() {
1480            if (this.locked.compareAndSet(true, false)) {
1481                store.releaseLocalAsyncLock();
1482                globalQueueSemaphore.release();
1483                message.decrementReferenceCount();
1484            }
1485        }
1486
1487        @Override
1488        public void run() {
1489            this.store.doneTasks++;
1490            try {
1491                if (this.done.compareAndSet(false, true)) {
1492                    this.store.addMessage(context, message);
1493                    removeQueueTask(this.store, this.message.getMessageId());
1494                    this.future.complete();
1495                } else if (cancelledTaskModMetric > 0 && (++this.store.canceledTasks) % cancelledTaskModMetric == 0) {
1496                    System.err.println(this.store.dest.getName() + " cancelled: "
1497                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1498                    this.store.canceledTasks = this.store.doneTasks = 0;
1499                }
1500            } catch (Throwable t) {
1501                this.future.setException(t);
1502                removeQueueTask(this.store, this.message.getMessageId());
1503            }
1504        }
1505
1506        protected Message getMessage() {
1507            return this.message;
1508        }
1509
1510        private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object>  {
1511
1512            private final AtomicReference<Runnable> listenerRef = new AtomicReference<>();
1513
1514            public InnerFutureTask(Runnable runnable) {
1515                super(runnable, null);
1516            }
1517
1518            @Override
1519            public void setException(final Throwable e) {
1520                super.setException(e);
1521            }
1522
1523            public void complete() {
1524                super.set(null);
1525            }
1526
1527            @Override
1528            public void done() {
1529                fireListener();
1530            }
1531
1532            @Override
1533            public void addListener(Runnable listener) {
1534                this.listenerRef.set(listener);
1535                if (isDone()) {
1536                    fireListener();
1537                }
1538            }
1539
1540            private void fireListener() {
1541                Runnable listener = listenerRef.getAndSet(null);
1542                if (listener != null) {
1543                    try {
1544                        listener.run();
1545                    } catch (Exception ignored) {
1546                        LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored);
1547                    }
1548                }
1549            }
1550        }
1551    }
1552
1553    class StoreTopicTask extends StoreQueueTask {
1554        private final int subscriptionCount;
1555        private final List<String> subscriptionKeys = new ArrayList<String>(1);
1556        private final KahaDBTopicMessageStore topicStore;
1557        public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1558                int subscriptionCount) {
1559            super(store, context, message);
1560            this.topicStore = store;
1561            this.subscriptionCount = subscriptionCount;
1562
1563        }
1564
1565        @Override
1566        public void aquireLocks() {
1567            if (this.locked.compareAndSet(false, true)) {
1568                try {
1569                    globalTopicSemaphore.acquire();
1570                    store.acquireLocalAsyncLock();
1571                    message.incrementReferenceCount();
1572                } catch (InterruptedException e) {
1573                    LOG.warn("Failed to aquire lock", e);
1574                }
1575            }
1576        }
1577
1578        @Override
1579        public void releaseLocks() {
1580            if (this.locked.compareAndSet(true, false)) {
1581                message.decrementReferenceCount();
1582                store.releaseLocalAsyncLock();
1583                globalTopicSemaphore.release();
1584            }
1585        }
1586
1587        /**
1588         * add a key
1589         *
1590         * @param key
1591         * @return true if all acknowledgements received
1592         */
1593        public boolean addSubscriptionKey(String key) {
1594            synchronized (this.subscriptionKeys) {
1595                this.subscriptionKeys.add(key);
1596            }
1597            return this.subscriptionKeys.size() >= this.subscriptionCount;
1598        }
1599
1600        @Override
1601        public void run() {
1602            this.store.doneTasks++;
1603            try {
1604                if (this.done.compareAndSet(false, true)) {
1605                    this.topicStore.addMessage(context, message);
1606                    // apply any acks we have
1607                    synchronized (this.subscriptionKeys) {
1608                        for (String key : this.subscriptionKeys) {
1609                            this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1610
1611                        }
1612                    }
1613                    removeTopicTask(this.topicStore, this.message.getMessageId());
1614                    this.future.complete();
1615                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1616                    System.err.println(this.store.dest.getName() + " cancelled: "
1617                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1618                    this.store.canceledTasks = this.store.doneTasks = 0;
1619                }
1620            } catch (Throwable t) {
1621                this.future.setException(t);
1622                removeTopicTask(this.topicStore, this.message.getMessageId());
1623            }
1624        }
1625    }
1626
1627    public class StoreTaskExecutor extends ThreadPoolExecutor {
1628
1629        public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1630            super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1631        }
1632
1633        @Override
1634        protected void afterExecute(Runnable runnable, Throwable throwable) {
1635            super.afterExecute(runnable, throwable);
1636
1637            if (runnable instanceof StoreTask) {
1638               ((StoreTask)runnable).releaseLocks();
1639            }
1640        }
1641    }
1642
1643    @Override
1644    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
1645        return new JobSchedulerStoreImpl();
1646    }
1647}