001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.Map.Entry;
026import java.util.Set;
027
028import org.apache.activemq.broker.BrokerService;
029import org.apache.activemq.broker.BrokerServiceAware;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.broker.scheduler.JobSchedulerStore;
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.ActiveMQQueue;
034import org.apache.activemq.command.ActiveMQTempQueue;
035import org.apache.activemq.command.ActiveMQTempTopic;
036import org.apache.activemq.command.ActiveMQTopic;
037import org.apache.activemq.command.Message;
038import org.apache.activemq.command.MessageAck;
039import org.apache.activemq.command.MessageId;
040import org.apache.activemq.command.ProducerId;
041import org.apache.activemq.command.SubscriptionInfo;
042import org.apache.activemq.command.TransactionId;
043import org.apache.activemq.command.XATransactionId;
044import org.apache.activemq.openwire.OpenWireFormat;
045import org.apache.activemq.protobuf.Buffer;
046import org.apache.activemq.store.AbstractMessageStore;
047import org.apache.activemq.store.MessageRecoveryListener;
048import org.apache.activemq.store.MessageStore;
049import org.apache.activemq.store.PersistenceAdapter;
050import org.apache.activemq.store.TopicMessageStore;
051import org.apache.activemq.store.TransactionRecoveryListener;
052import org.apache.activemq.store.TransactionStore;
053import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
054import org.apache.activemq.store.kahadb.data.KahaDestination;
055import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
056import org.apache.activemq.store.kahadb.data.KahaLocation;
057import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
058import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
059import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
060import org.apache.activemq.store.kahadb.disk.journal.Location;
061import org.apache.activemq.store.kahadb.disk.page.Transaction;
062import org.apache.activemq.usage.MemoryUsage;
063import org.apache.activemq.usage.SystemUsage;
064import org.apache.activemq.util.ByteSequence;
065import org.apache.activemq.wireformat.WireFormat;
066
067public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware {
068
069    private final WireFormat wireFormat = new OpenWireFormat();
070    private BrokerService brokerService;
071
072    @Override
073    public void setBrokerName(String brokerName) {
074    }
075    @Override
076    public void setUsageManager(SystemUsage usageManager) {
077    }
078
079    @Override
080    public TransactionStore createTransactionStore() throws IOException {
081        return new TransactionStore(){
082
083            @Override
084            public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
085                if (preCommit != null) {
086                    preCommit.run();
087                }
088                processCommit(txid);
089                if (postCommit != null) {
090                    postCommit.run();
091                }
092            }
093            @Override
094            public void prepare(TransactionId txid) throws IOException {
095                processPrepare(txid);
096            }
097            @Override
098            public void rollback(TransactionId txid) throws IOException {
099                processRollback(txid);
100            }
101            @Override
102            public void recover(TransactionRecoveryListener listener) throws IOException {
103                for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
104                    XATransactionId xid = (XATransactionId)entry.getKey();
105                    ArrayList<Message> messageList = new ArrayList<Message>();
106                    ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
107
108                    for (Operation op : entry.getValue()) {
109                        if( op.getClass() == AddOpperation.class ) {
110                            AddOpperation addOp = (AddOpperation)op;
111                            Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) );
112                            messageList.add(msg);
113                        } else {
114                            RemoveOpperation rmOp = (RemoveOpperation)op;
115                            MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) );
116                            ackList.add(ack);
117                        }
118                    }
119
120                    Message[] addedMessages = new Message[messageList.size()];
121                    MessageAck[] acks = new MessageAck[ackList.size()];
122                    messageList.toArray(addedMessages);
123                    ackList.toArray(acks);
124                    listener.recover(xid, addedMessages, acks);
125                }
126            }
127            @Override
128            public void start() throws Exception {
129            }
130            @Override
131            public void stop() throws Exception {
132            }
133        };
134    }
135
136    public class KahaDBMessageStore extends AbstractMessageStore {
137        protected KahaDestination dest;
138
139        public KahaDBMessageStore(ActiveMQDestination destination) {
140            super(destination);
141            this.dest = convert( destination );
142        }
143
144        @Override
145        public ActiveMQDestination getDestination() {
146            return destination;
147        }
148
149        @Override
150        public void addMessage(ConnectionContext context, Message message) throws IOException {
151            KahaAddMessageCommand command = new KahaAddMessageCommand();
152            command.setDestination(dest);
153            command.setMessageId(message.getMessageId().toProducerKey());
154            processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
155        }
156
157        @Override
158        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
159            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
160            command.setDestination(dest);
161            command.setMessageId(ack.getLastMessageId().toProducerKey());
162            processRemove(command, ack.getTransactionId());
163        }
164
165        @Override
166        public void removeAllMessages(ConnectionContext context) throws IOException {
167            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
168            command.setDestination(dest);
169            process(command);
170        }
171
172        @Override
173        public Message getMessage(MessageId identity) throws IOException {
174            final String key = identity.toProducerKey();
175
176            // Hopefully one day the page file supports concurrent read operations... but for now we must
177            // externally synchronize...
178            ByteSequence data;
179            synchronized(indexMutex) {
180                data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){
181                    @Override
182                    public ByteSequence execute(Transaction tx) throws IOException {
183                        StoredDestination sd = getStoredDestination(dest, tx);
184                        Long sequence = sd.messageIdIndex.get(tx, key);
185                        if( sequence ==null ) {
186                            return null;
187                        }
188                        return sd.orderIndex.get(tx, sequence).data;
189                    }
190                });
191            }
192            if( data == null ) {
193                return null;
194            }
195
196            Message msg = (Message)wireFormat.unmarshal( data );
197            return msg;
198        }
199
200        @Override
201        public int getMessageCount() throws IOException {
202            synchronized(indexMutex) {
203                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
204                    @Override
205                    public Integer execute(Transaction tx) throws IOException {
206                        // Iterate through all index entries to get a count of messages in the destination.
207                        StoredDestination sd = getStoredDestination(dest, tx);
208                        int rc=0;
209                        for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
210                            iterator.next();
211                            rc++;
212                        }
213                        return rc;
214                    }
215                });
216            }
217        }
218
219        @Override
220        public void recover(final MessageRecoveryListener listener) throws Exception {
221            synchronized(indexMutex) {
222                pageFile.tx().execute(new Transaction.Closure<Exception>(){
223                    @Override
224                    public void execute(Transaction tx) throws Exception {
225                        StoredDestination sd = getStoredDestination(dest, tx);
226                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
227                            Entry<Long, MessageRecord> entry = iterator.next();
228                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data) );
229                        }
230                    }
231                });
232            }
233        }
234
235        long cursorPos=0;
236
237        @Override
238        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
239            synchronized(indexMutex) {
240                pageFile.tx().execute(new Transaction.Closure<Exception>(){
241                    @Override
242                    public void execute(Transaction tx) throws Exception {
243                        StoredDestination sd = getStoredDestination(dest, tx);
244                        Entry<Long, MessageRecord> entry=null;
245                        int counter = 0;
246                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
247                            entry = iterator.next();
248                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
249                            counter++;
250                            if( counter >= maxReturned ) {
251                                break;
252                            }
253                        }
254                        if( entry!=null ) {
255                            cursorPos = entry.getKey()+1;
256                        }
257                    }
258                });
259            }
260        }
261
262        @Override
263        public void resetBatching() {
264            cursorPos=0;
265        }
266
267
268        @Override
269        public void setBatch(MessageId identity) throws IOException {
270            final String key = identity.toProducerKey();
271
272            // Hopefully one day the page file supports concurrent read operations... but for now we must
273            // externally synchronize...
274            Long location;
275            synchronized(indexMutex) {
276                location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
277                    @Override
278                    public Long execute(Transaction tx) throws IOException {
279                        StoredDestination sd = getStoredDestination(dest, tx);
280                        return sd.messageIdIndex.get(tx, key);
281                    }
282                });
283            }
284            if( location!=null ) {
285                cursorPos=location+1;
286            }
287
288        }
289
290        @Override
291        public void setMemoryUsage(MemoryUsage memoryUsage) {
292        }
293        @Override
294        public void start() throws Exception {
295        }
296        @Override
297        public void stop() throws Exception {
298        }
299
300    }
301
302    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
303        public KahaDBTopicMessageStore(ActiveMQTopic destination) {
304            super(destination);
305        }
306
307        @Override
308        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
309                                MessageId messageId, MessageAck ack) throws IOException {
310            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
311            command.setDestination(dest);
312            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
313            command.setMessageId(messageId.toProducerKey());
314            // We are not passed a transaction info.. so we can't participate in a transaction.
315            // Looks like a design issue with the TopicMessageStore interface.  Also we can't recover the original ack
316            // to pass back to the XA recover method.
317            // command.setTransactionInfo();
318            processRemove(command, null);
319        }
320
321        @Override
322        public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
323            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
324            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
325            command.setDestination(dest);
326            command.setSubscriptionKey(subscriptionKey);
327            command.setRetroactive(retroactive);
328            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
329            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
330            process(command);
331        }
332
333        @Override
334        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
335            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
336            command.setDestination(dest);
337            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
338            process(command);
339        }
340
341        @Override
342        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
343
344            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
345            synchronized(indexMutex) {
346                pageFile.tx().execute(new Transaction.Closure<IOException>(){
347                    @Override
348                    public void execute(Transaction tx) throws IOException {
349                        StoredDestination sd = getStoredDestination(dest, tx);
350                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
351                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
352                            SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) );
353                            subscriptions.add(info);
354
355                        }
356                    }
357                });
358            }
359
360            SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
361            subscriptions.toArray(rc);
362            return rc;
363        }
364
365        @Override
366        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
367            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
368            synchronized(indexMutex) {
369                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
370                    @Override
371                    public SubscriptionInfo execute(Transaction tx) throws IOException {
372                        StoredDestination sd = getStoredDestination(dest, tx);
373                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
374                        if( command ==null ) {
375                            return null;
376                        }
377                        return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) );
378                    }
379                });
380            }
381        }
382
383        @Override
384        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
385            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
386            synchronized(indexMutex) {
387                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
388                    @Override
389                    public Integer execute(Transaction tx) throws IOException {
390                        StoredDestination sd = getStoredDestination(dest, tx);
391                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
392                        if ( cursorPos==null ) {
393                            // The subscription might not exist.
394                            return 0;
395                        }
396                        cursorPos += 1;
397
398                        int counter = 0;
399                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
400                            iterator.next();
401                            counter++;
402                        }
403                        return counter;
404                    }
405                });
406            }
407        }
408
409        @Override
410        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
411            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
412            synchronized(indexMutex) {
413                pageFile.tx().execute(new Transaction.Closure<Exception>(){
414                    @Override
415                    public void execute(Transaction tx) throws Exception {
416                        StoredDestination sd = getStoredDestination(dest, tx);
417                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
418                        cursorPos += 1;
419
420                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
421                            Entry<Long, MessageRecord> entry = iterator.next();
422                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
423                        }
424                    }
425                });
426            }
427        }
428
429        @Override
430        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
431            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
432            synchronized(indexMutex) {
433                pageFile.tx().execute(new Transaction.Closure<Exception>(){
434                    @Override
435                    public void execute(Transaction tx) throws Exception {
436                        StoredDestination sd = getStoredDestination(dest, tx);
437                        Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
438                        if( cursorPos == null ) {
439                            cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
440                            cursorPos += 1;
441                        }
442
443                        Entry<Long, MessageRecord> entry=null;
444                        int counter = 0;
445                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
446                            entry = iterator.next();
447                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
448                            counter++;
449                            if( counter >= maxReturned ) {
450                                break;
451                            }
452                        }
453                        if( entry!=null ) {
454                            sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1);
455                        }
456                    }
457                });
458            }
459        }
460
461        @Override
462        public void resetBatching(String clientId, String subscriptionName) {
463            try {
464                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
465                synchronized(indexMutex) {
466                    pageFile.tx().execute(new Transaction.Closure<IOException>(){
467                        @Override
468                        public void execute(Transaction tx) throws IOException {
469                            StoredDestination sd = getStoredDestination(dest, tx);
470                            sd.subscriptionCursors.remove(subscriptionKey);
471                        }
472                    });
473                }
474            } catch (IOException e) {
475                throw new RuntimeException(e);
476            }
477        }
478    }
479
480    String subscriptionKey(String clientId, String subscriptionName){
481        return clientId+":"+subscriptionName;
482    }
483
484    @Override
485    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
486        return new KahaDBMessageStore(destination);
487    }
488
489    @Override
490    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
491        return new KahaDBTopicMessageStore(destination);
492    }
493
494    /**
495     * Cleanup method to remove any state associated with the given destination.
496     * This method does not stop the message store (it might not be cached).
497     *
498     * @param destination Destination to forget
499     */
500    @Override
501    public void removeQueueMessageStore(ActiveMQQueue destination) {
502    }
503
504    /**
505     * Cleanup method to remove any state associated with the given destination
506     * This method does not stop the message store (it might not be cached).
507     *
508     * @param destination Destination to forget
509     */
510    @Override
511    public void removeTopicMessageStore(ActiveMQTopic destination) {
512    }
513
514    @Override
515    public void deleteAllMessages() throws IOException {
516    }
517
518
519    @Override
520    public Set<ActiveMQDestination> getDestinations() {
521        try {
522            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
523            synchronized(indexMutex) {
524                pageFile.tx().execute(new Transaction.Closure<IOException>(){
525                    @Override
526                    public void execute(Transaction tx) throws IOException {
527                        for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) {
528                            Entry<String, StoredDestination> entry = iterator.next();
529                            rc.add(convert(entry.getKey()));
530                        }
531                    }
532                });
533            }
534            return rc;
535        } catch (IOException e) {
536            throw new RuntimeException(e);
537        }
538    }
539
540    @Override
541    public long getLastMessageBrokerSequenceId() throws IOException {
542        return 0;
543    }
544
545    @Override
546    public long size() {
547        if ( !started.get() ) {
548            return 0;
549        }
550        try {
551            return pageFile.getDiskSize();
552        } catch (IOException e) {
553            throw new RuntimeException(e);
554        }
555    }
556
557    @Override
558    public void beginTransaction(ConnectionContext context) throws IOException {
559        throw new IOException("Not yet implemented.");
560    }
561    @Override
562    public void commitTransaction(ConnectionContext context) throws IOException {
563        throw new IOException("Not yet implemented.");
564    }
565    @Override
566    public void rollbackTransaction(ConnectionContext context) throws IOException {
567        throw new IOException("Not yet implemented.");
568    }
569
570    @Override
571    public void checkpoint(boolean sync) throws IOException {
572    }
573
574    ///////////////////////////////////////////////////////////////////
575    // Internal conversion methods.
576    ///////////////////////////////////////////////////////////////////
577
578
579
580    KahaLocation convert(Location location) {
581        KahaLocation rc = new KahaLocation();
582        rc.setLogId(location.getDataFileId());
583        rc.setOffset(location.getOffset());
584        return rc;
585    }
586
587    KahaDestination convert(ActiveMQDestination dest) {
588        KahaDestination rc = new KahaDestination();
589        rc.setName(dest.getPhysicalName());
590        switch( dest.getDestinationType() ) {
591        case ActiveMQDestination.QUEUE_TYPE:
592            rc.setType(DestinationType.QUEUE);
593            return rc;
594        case ActiveMQDestination.TOPIC_TYPE:
595            rc.setType(DestinationType.TOPIC);
596            return rc;
597        case ActiveMQDestination.TEMP_QUEUE_TYPE:
598            rc.setType(DestinationType.TEMP_QUEUE);
599            return rc;
600        case ActiveMQDestination.TEMP_TOPIC_TYPE:
601            rc.setType(DestinationType.TEMP_TOPIC);
602            return rc;
603        default:
604            return null;
605        }
606    }
607
608    ActiveMQDestination convert(String dest) {
609        int p = dest.indexOf(":");
610        if( p<0 ) {
611            throw new IllegalArgumentException("Not in the valid destination format");
612        }
613        int type = Integer.parseInt(dest.substring(0, p));
614        String name = dest.substring(p+1);
615
616        switch( KahaDestination.DestinationType.valueOf(type) ) {
617        case QUEUE:
618            return new ActiveMQQueue(name);
619        case TOPIC:
620            return new ActiveMQTopic(name);
621        case TEMP_QUEUE:
622            return new ActiveMQTempQueue(name);
623        case TEMP_TOPIC:
624            return new ActiveMQTempTopic(name);
625        default:
626            throw new IllegalArgumentException("Not in the valid destination format");
627        }
628    }
629
630    @Override
631    public long getLastProducerSequenceId(ProducerId id) {
632        return -1;
633    }
634
635    @Override
636    public void allowIOResumption() {
637        if (pageFile != null) {
638            pageFile.allowIOResumption();
639        }
640    }
641
642    @Override
643    public void setBrokerService(BrokerService brokerService) {
644        this.brokerService = brokerService;
645    }
646
647    @Override
648    public void load() throws IOException {
649        if( brokerService!=null ) {
650            wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
651        }
652        super.load();
653    }
654    @Override
655    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
656        throw new UnsupportedOperationException();
657    }
658}