001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.Map; 025import java.util.Map.Entry; 026import java.util.Set; 027 028import org.apache.activemq.broker.BrokerService; 029import org.apache.activemq.broker.BrokerServiceAware; 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.broker.scheduler.JobSchedulerStore; 032import org.apache.activemq.command.ActiveMQDestination; 033import org.apache.activemq.command.ActiveMQQueue; 034import org.apache.activemq.command.ActiveMQTempQueue; 035import org.apache.activemq.command.ActiveMQTempTopic; 036import org.apache.activemq.command.ActiveMQTopic; 037import org.apache.activemq.command.Message; 038import org.apache.activemq.command.MessageAck; 039import org.apache.activemq.command.MessageId; 040import org.apache.activemq.command.ProducerId; 041import org.apache.activemq.command.SubscriptionInfo; 042import org.apache.activemq.command.TransactionId; 043import org.apache.activemq.command.XATransactionId; 044import org.apache.activemq.openwire.OpenWireFormat; 045import org.apache.activemq.protobuf.Buffer; 046import org.apache.activemq.store.AbstractMessageStore; 047import org.apache.activemq.store.MessageRecoveryListener; 048import org.apache.activemq.store.MessageStore; 049import org.apache.activemq.store.PersistenceAdapter; 050import org.apache.activemq.store.TopicMessageStore; 051import org.apache.activemq.store.TransactionRecoveryListener; 052import org.apache.activemq.store.TransactionStore; 053import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 054import org.apache.activemq.store.kahadb.data.KahaDestination; 055import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 056import org.apache.activemq.store.kahadb.data.KahaLocation; 057import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 058import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 059import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 060import org.apache.activemq.store.kahadb.disk.journal.Location; 061import org.apache.activemq.store.kahadb.disk.page.Transaction; 062import org.apache.activemq.usage.MemoryUsage; 063import org.apache.activemq.usage.SystemUsage; 064import org.apache.activemq.util.ByteSequence; 065import org.apache.activemq.wireformat.WireFormat; 066 067public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware { 068 069 private final WireFormat wireFormat = new OpenWireFormat(); 070 private BrokerService brokerService; 071 072 @Override 073 public void setBrokerName(String brokerName) { 074 } 075 @Override 076 public void setUsageManager(SystemUsage usageManager) { 077 } 078 079 @Override 080 public TransactionStore createTransactionStore() throws IOException { 081 return new TransactionStore(){ 082 083 @Override 084 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException { 085 if (preCommit != null) { 086 preCommit.run(); 087 } 088 processCommit(txid); 089 if (postCommit != null) { 090 postCommit.run(); 091 } 092 } 093 @Override 094 public void prepare(TransactionId txid) throws IOException { 095 processPrepare(txid); 096 } 097 @Override 098 public void rollback(TransactionId txid) throws IOException { 099 processRollback(txid); 100 } 101 @Override 102 public void recover(TransactionRecoveryListener listener) throws IOException { 103 for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) { 104 XATransactionId xid = (XATransactionId)entry.getKey(); 105 ArrayList<Message> messageList = new ArrayList<Message>(); 106 ArrayList<MessageAck> ackList = new ArrayList<MessageAck>(); 107 108 for (Operation op : entry.getValue()) { 109 if( op.getClass() == AddOpperation.class ) { 110 AddOpperation addOp = (AddOpperation)op; 111 Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) ); 112 messageList.add(msg); 113 } else { 114 RemoveOpperation rmOp = (RemoveOpperation)op; 115 MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) ); 116 ackList.add(ack); 117 } 118 } 119 120 Message[] addedMessages = new Message[messageList.size()]; 121 MessageAck[] acks = new MessageAck[ackList.size()]; 122 messageList.toArray(addedMessages); 123 ackList.toArray(acks); 124 listener.recover(xid, addedMessages, acks); 125 } 126 } 127 @Override 128 public void start() throws Exception { 129 } 130 @Override 131 public void stop() throws Exception { 132 } 133 }; 134 } 135 136 public class KahaDBMessageStore extends AbstractMessageStore { 137 protected KahaDestination dest; 138 139 public KahaDBMessageStore(ActiveMQDestination destination) { 140 super(destination); 141 this.dest = convert( destination ); 142 } 143 144 @Override 145 public ActiveMQDestination getDestination() { 146 return destination; 147 } 148 149 @Override 150 public void addMessage(ConnectionContext context, Message message) throws IOException { 151 KahaAddMessageCommand command = new KahaAddMessageCommand(); 152 command.setDestination(dest); 153 command.setMessageId(message.getMessageId().toProducerKey()); 154 processAdd(command, message.getTransactionId(), wireFormat.marshal(message)); 155 } 156 157 @Override 158 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 159 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 160 command.setDestination(dest); 161 command.setMessageId(ack.getLastMessageId().toProducerKey()); 162 processRemove(command, ack.getTransactionId()); 163 } 164 165 @Override 166 public void removeAllMessages(ConnectionContext context) throws IOException { 167 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 168 command.setDestination(dest); 169 process(command); 170 } 171 172 @Override 173 public Message getMessage(MessageId identity) throws IOException { 174 final String key = identity.toProducerKey(); 175 176 // Hopefully one day the page file supports concurrent read operations... but for now we must 177 // externally synchronize... 178 ByteSequence data; 179 synchronized(indexMutex) { 180 data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){ 181 @Override 182 public ByteSequence execute(Transaction tx) throws IOException { 183 StoredDestination sd = getStoredDestination(dest, tx); 184 Long sequence = sd.messageIdIndex.get(tx, key); 185 if( sequence ==null ) { 186 return null; 187 } 188 return sd.orderIndex.get(tx, sequence).data; 189 } 190 }); 191 } 192 if( data == null ) { 193 return null; 194 } 195 196 Message msg = (Message)wireFormat.unmarshal( data ); 197 return msg; 198 } 199 200 @Override 201 public int getMessageCount() throws IOException { 202 synchronized(indexMutex) { 203 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ 204 @Override 205 public Integer execute(Transaction tx) throws IOException { 206 // Iterate through all index entries to get a count of messages in the destination. 207 StoredDestination sd = getStoredDestination(dest, tx); 208 int rc=0; 209 for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) { 210 iterator.next(); 211 rc++; 212 } 213 return rc; 214 } 215 }); 216 } 217 } 218 219 @Override 220 public void recover(final MessageRecoveryListener listener) throws Exception { 221 synchronized(indexMutex) { 222 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 223 @Override 224 public void execute(Transaction tx) throws Exception { 225 StoredDestination sd = getStoredDestination(dest, tx); 226 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { 227 Entry<Long, MessageRecord> entry = iterator.next(); 228 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data) ); 229 } 230 } 231 }); 232 } 233 } 234 235 long cursorPos=0; 236 237 @Override 238 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 239 synchronized(indexMutex) { 240 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 241 @Override 242 public void execute(Transaction tx) throws Exception { 243 StoredDestination sd = getStoredDestination(dest, tx); 244 Entry<Long, MessageRecord> entry=null; 245 int counter = 0; 246 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 247 entry = iterator.next(); 248 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); 249 counter++; 250 if( counter >= maxReturned ) { 251 break; 252 } 253 } 254 if( entry!=null ) { 255 cursorPos = entry.getKey()+1; 256 } 257 } 258 }); 259 } 260 } 261 262 @Override 263 public void resetBatching() { 264 cursorPos=0; 265 } 266 267 268 @Override 269 public void setBatch(MessageId identity) throws IOException { 270 final String key = identity.toProducerKey(); 271 272 // Hopefully one day the page file supports concurrent read operations... but for now we must 273 // externally synchronize... 274 Long location; 275 synchronized(indexMutex) { 276 location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){ 277 @Override 278 public Long execute(Transaction tx) throws IOException { 279 StoredDestination sd = getStoredDestination(dest, tx); 280 return sd.messageIdIndex.get(tx, key); 281 } 282 }); 283 } 284 if( location!=null ) { 285 cursorPos=location+1; 286 } 287 288 } 289 290 @Override 291 public void setMemoryUsage(MemoryUsage memoryUsage) { 292 } 293 @Override 294 public void start() throws Exception { 295 } 296 @Override 297 public void stop() throws Exception { 298 } 299 300 } 301 302 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 303 public KahaDBTopicMessageStore(ActiveMQTopic destination) { 304 super(destination); 305 } 306 307 @Override 308 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 309 MessageId messageId, MessageAck ack) throws IOException { 310 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 311 command.setDestination(dest); 312 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); 313 command.setMessageId(messageId.toProducerKey()); 314 // We are not passed a transaction info.. so we can't participate in a transaction. 315 // Looks like a design issue with the TopicMessageStore interface. Also we can't recover the original ack 316 // to pass back to the XA recover method. 317 // command.setTransactionInfo(); 318 processRemove(command, null); 319 } 320 321 @Override 322 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 323 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName()); 324 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 325 command.setDestination(dest); 326 command.setSubscriptionKey(subscriptionKey); 327 command.setRetroactive(retroactive); 328 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 329 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 330 process(command); 331 } 332 333 @Override 334 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 335 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 336 command.setDestination(dest); 337 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); 338 process(command); 339 } 340 341 @Override 342 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 343 344 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 345 synchronized(indexMutex) { 346 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 347 @Override 348 public void execute(Transaction tx) throws IOException { 349 StoredDestination sd = getStoredDestination(dest, tx); 350 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) { 351 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 352 SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) ); 353 subscriptions.add(info); 354 355 } 356 } 357 }); 358 } 359 360 SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()]; 361 subscriptions.toArray(rc); 362 return rc; 363 } 364 365 @Override 366 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 367 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 368 synchronized(indexMutex) { 369 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){ 370 @Override 371 public SubscriptionInfo execute(Transaction tx) throws IOException { 372 StoredDestination sd = getStoredDestination(dest, tx); 373 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 374 if( command ==null ) { 375 return null; 376 } 377 return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) ); 378 } 379 }); 380 } 381 } 382 383 @Override 384 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 385 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 386 synchronized(indexMutex) { 387 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){ 388 @Override 389 public Integer execute(Transaction tx) throws IOException { 390 StoredDestination sd = getStoredDestination(dest, tx); 391 Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 392 if ( cursorPos==null ) { 393 // The subscription might not exist. 394 return 0; 395 } 396 cursorPos += 1; 397 398 int counter = 0; 399 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 400 iterator.next(); 401 counter++; 402 } 403 return counter; 404 } 405 }); 406 } 407 } 408 409 @Override 410 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { 411 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 412 synchronized(indexMutex) { 413 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 414 @Override 415 public void execute(Transaction tx) throws Exception { 416 StoredDestination sd = getStoredDestination(dest, tx); 417 Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 418 cursorPos += 1; 419 420 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 421 Entry<Long, MessageRecord> entry = iterator.next(); 422 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); 423 } 424 } 425 }); 426 } 427 } 428 429 @Override 430 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { 431 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 432 synchronized(indexMutex) { 433 pageFile.tx().execute(new Transaction.Closure<Exception>(){ 434 @Override 435 public void execute(Transaction tx) throws Exception { 436 StoredDestination sd = getStoredDestination(dest, tx); 437 Long cursorPos = sd.subscriptionCursors.get(subscriptionKey); 438 if( cursorPos == null ) { 439 cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); 440 cursorPos += 1; 441 } 442 443 Entry<Long, MessageRecord> entry=null; 444 int counter = 0; 445 for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { 446 entry = iterator.next(); 447 listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); 448 counter++; 449 if( counter >= maxReturned ) { 450 break; 451 } 452 } 453 if( entry!=null ) { 454 sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1); 455 } 456 } 457 }); 458 } 459 } 460 461 @Override 462 public void resetBatching(String clientId, String subscriptionName) { 463 try { 464 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 465 synchronized(indexMutex) { 466 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 467 @Override 468 public void execute(Transaction tx) throws IOException { 469 StoredDestination sd = getStoredDestination(dest, tx); 470 sd.subscriptionCursors.remove(subscriptionKey); 471 } 472 }); 473 } 474 } catch (IOException e) { 475 throw new RuntimeException(e); 476 } 477 } 478 } 479 480 String subscriptionKey(String clientId, String subscriptionName){ 481 return clientId+":"+subscriptionName; 482 } 483 484 @Override 485 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 486 return new KahaDBMessageStore(destination); 487 } 488 489 @Override 490 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 491 return new KahaDBTopicMessageStore(destination); 492 } 493 494 /** 495 * Cleanup method to remove any state associated with the given destination. 496 * This method does not stop the message store (it might not be cached). 497 * 498 * @param destination Destination to forget 499 */ 500 @Override 501 public void removeQueueMessageStore(ActiveMQQueue destination) { 502 } 503 504 /** 505 * Cleanup method to remove any state associated with the given destination 506 * This method does not stop the message store (it might not be cached). 507 * 508 * @param destination Destination to forget 509 */ 510 @Override 511 public void removeTopicMessageStore(ActiveMQTopic destination) { 512 } 513 514 @Override 515 public void deleteAllMessages() throws IOException { 516 } 517 518 519 @Override 520 public Set<ActiveMQDestination> getDestinations() { 521 try { 522 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 523 synchronized(indexMutex) { 524 pageFile.tx().execute(new Transaction.Closure<IOException>(){ 525 @Override 526 public void execute(Transaction tx) throws IOException { 527 for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) { 528 Entry<String, StoredDestination> entry = iterator.next(); 529 rc.add(convert(entry.getKey())); 530 } 531 } 532 }); 533 } 534 return rc; 535 } catch (IOException e) { 536 throw new RuntimeException(e); 537 } 538 } 539 540 @Override 541 public long getLastMessageBrokerSequenceId() throws IOException { 542 return 0; 543 } 544 545 @Override 546 public long size() { 547 if ( !started.get() ) { 548 return 0; 549 } 550 try { 551 return pageFile.getDiskSize(); 552 } catch (IOException e) { 553 throw new RuntimeException(e); 554 } 555 } 556 557 @Override 558 public void beginTransaction(ConnectionContext context) throws IOException { 559 throw new IOException("Not yet implemented."); 560 } 561 @Override 562 public void commitTransaction(ConnectionContext context) throws IOException { 563 throw new IOException("Not yet implemented."); 564 } 565 @Override 566 public void rollbackTransaction(ConnectionContext context) throws IOException { 567 throw new IOException("Not yet implemented."); 568 } 569 570 @Override 571 public void checkpoint(boolean sync) throws IOException { 572 } 573 574 /////////////////////////////////////////////////////////////////// 575 // Internal conversion methods. 576 /////////////////////////////////////////////////////////////////// 577 578 579 580 KahaLocation convert(Location location) { 581 KahaLocation rc = new KahaLocation(); 582 rc.setLogId(location.getDataFileId()); 583 rc.setOffset(location.getOffset()); 584 return rc; 585 } 586 587 KahaDestination convert(ActiveMQDestination dest) { 588 KahaDestination rc = new KahaDestination(); 589 rc.setName(dest.getPhysicalName()); 590 switch( dest.getDestinationType() ) { 591 case ActiveMQDestination.QUEUE_TYPE: 592 rc.setType(DestinationType.QUEUE); 593 return rc; 594 case ActiveMQDestination.TOPIC_TYPE: 595 rc.setType(DestinationType.TOPIC); 596 return rc; 597 case ActiveMQDestination.TEMP_QUEUE_TYPE: 598 rc.setType(DestinationType.TEMP_QUEUE); 599 return rc; 600 case ActiveMQDestination.TEMP_TOPIC_TYPE: 601 rc.setType(DestinationType.TEMP_TOPIC); 602 return rc; 603 default: 604 return null; 605 } 606 } 607 608 ActiveMQDestination convert(String dest) { 609 int p = dest.indexOf(":"); 610 if( p<0 ) { 611 throw new IllegalArgumentException("Not in the valid destination format"); 612 } 613 int type = Integer.parseInt(dest.substring(0, p)); 614 String name = dest.substring(p+1); 615 616 switch( KahaDestination.DestinationType.valueOf(type) ) { 617 case QUEUE: 618 return new ActiveMQQueue(name); 619 case TOPIC: 620 return new ActiveMQTopic(name); 621 case TEMP_QUEUE: 622 return new ActiveMQTempQueue(name); 623 case TEMP_TOPIC: 624 return new ActiveMQTempTopic(name); 625 default: 626 throw new IllegalArgumentException("Not in the valid destination format"); 627 } 628 } 629 630 @Override 631 public long getLastProducerSequenceId(ProducerId id) { 632 return -1; 633 } 634 635 @Override 636 public void setBrokerService(BrokerService brokerService) { 637 this.brokerService = brokerService; 638 } 639 640 @Override 641 public void load() throws IOException { 642 if( brokerService!=null ) { 643 wireFormat.setVersion(brokerService.getStoreOpenWireVersion()); 644 } 645 super.load(); 646 } 647 @Override 648 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 649 throw new UnsupportedOperationException(); 650 } 651}