001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedHashSet; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Map.Entry; 031import java.util.Set; 032import java.util.concurrent.BlockingQueue; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.FutureTask; 035import java.util.concurrent.LinkedBlockingQueue; 036import java.util.concurrent.Semaphore; 037import java.util.concurrent.ThreadFactory; 038import java.util.concurrent.ThreadPoolExecutor; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.TimeoutException; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicInteger; 043import java.util.concurrent.atomic.AtomicReference; 044 045import org.apache.activemq.broker.ConnectionContext; 046import org.apache.activemq.broker.region.BaseDestination; 047import org.apache.activemq.broker.scheduler.JobSchedulerStore; 048import org.apache.activemq.command.ActiveMQDestination; 049import org.apache.activemq.command.ActiveMQQueue; 050import org.apache.activemq.command.ActiveMQTempQueue; 051import org.apache.activemq.command.ActiveMQTempTopic; 052import org.apache.activemq.command.ActiveMQTopic; 053import org.apache.activemq.command.Message; 054import org.apache.activemq.command.MessageAck; 055import org.apache.activemq.command.MessageId; 056import org.apache.activemq.command.ProducerId; 057import org.apache.activemq.command.SubscriptionInfo; 058import org.apache.activemq.command.TransactionId; 059import org.apache.activemq.openwire.OpenWireFormat; 060import org.apache.activemq.protobuf.Buffer; 061import org.apache.activemq.store.AbstractMessageStore; 062import org.apache.activemq.store.IndexListener; 063import org.apache.activemq.store.ListenableFuture; 064import org.apache.activemq.store.MessageRecoveryListener; 065import org.apache.activemq.store.MessageStore; 066import org.apache.activemq.store.PersistenceAdapter; 067import org.apache.activemq.store.ProxyMessageStore; 068import org.apache.activemq.store.TopicMessageStore; 069import org.apache.activemq.store.TransactionIdTransformer; 070import org.apache.activemq.store.TransactionStore; 071import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 072import org.apache.activemq.store.kahadb.data.KahaDestination; 073import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 074import org.apache.activemq.store.kahadb.data.KahaLocation; 075import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 076import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 077import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 078import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 079import org.apache.activemq.store.kahadb.disk.journal.Location; 080import org.apache.activemq.store.kahadb.disk.page.Transaction; 081import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 082import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; 083import org.apache.activemq.usage.MemoryUsage; 084import org.apache.activemq.usage.SystemUsage; 085import org.apache.activemq.util.IOExceptionSupport; 086import org.apache.activemq.util.ServiceStopper; 087import org.apache.activemq.util.ThreadPoolUtils; 088import org.apache.activemq.wireformat.WireFormat; 089import org.slf4j.Logger; 090import org.slf4j.LoggerFactory; 091 092public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { 093 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); 094 private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH; 095 096 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC"; 097 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty( 098 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10); 099 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS"; 100 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty( 101 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);; 102 103 protected ExecutorService queueExecutor; 104 protected ExecutorService topicExecutor; 105 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 106 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 107 final WireFormat wireFormat = new OpenWireFormat(); 108 private SystemUsage usageManager; 109 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue; 110 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue; 111 Semaphore globalQueueSemaphore; 112 Semaphore globalTopicSemaphore; 113 private boolean concurrentStoreAndDispatchQueues = true; 114 // when true, message order may be compromised when cache is exhausted if store is out 115 // or order w.r.t cache 116 private boolean concurrentStoreAndDispatchTopics = false; 117 private final boolean concurrentStoreAndDispatchTransactions = false; 118 private int maxAsyncJobs = MAX_ASYNC_JOBS; 119 private final KahaDBTransactionStore transactionStore; 120 private TransactionIdTransformer transactionIdTransformer; 121 122 public KahaDBStore() { 123 this.transactionStore = new KahaDBTransactionStore(this); 124 this.transactionIdTransformer = new TransactionIdTransformer() { 125 @Override 126 public TransactionId transform(TransactionId txid) { 127 return txid; 128 } 129 }; 130 } 131 132 @Override 133 public String toString() { 134 return "KahaDB:[" + directory.getAbsolutePath() + "]"; 135 } 136 137 @Override 138 public void setBrokerName(String brokerName) { 139 } 140 141 @Override 142 public void setUsageManager(SystemUsage usageManager) { 143 this.usageManager = usageManager; 144 } 145 146 public SystemUsage getUsageManager() { 147 return this.usageManager; 148 } 149 150 /** 151 * @return the concurrentStoreAndDispatch 152 */ 153 public boolean isConcurrentStoreAndDispatchQueues() { 154 return this.concurrentStoreAndDispatchQueues; 155 } 156 157 /** 158 * @param concurrentStoreAndDispatch 159 * the concurrentStoreAndDispatch to set 160 */ 161 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 162 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; 163 } 164 165 /** 166 * @return the concurrentStoreAndDispatch 167 */ 168 public boolean isConcurrentStoreAndDispatchTopics() { 169 return this.concurrentStoreAndDispatchTopics; 170 } 171 172 /** 173 * @param concurrentStoreAndDispatch 174 * the concurrentStoreAndDispatch to set 175 */ 176 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 177 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; 178 } 179 180 public boolean isConcurrentStoreAndDispatchTransactions() { 181 return this.concurrentStoreAndDispatchTransactions; 182 } 183 184 /** 185 * @return the maxAsyncJobs 186 */ 187 public int getMaxAsyncJobs() { 188 return this.maxAsyncJobs; 189 } 190 191 /** 192 * @param maxAsyncJobs 193 * the maxAsyncJobs to set 194 */ 195 public void setMaxAsyncJobs(int maxAsyncJobs) { 196 this.maxAsyncJobs = maxAsyncJobs; 197 } 198 199 @Override 200 public void doStart() throws Exception { 201 if (brokerService != null) { 202 metadata.openwireVersion = brokerService.getStoreOpenWireVersion(); 203 wireFormat.setVersion(metadata.openwireVersion); 204 205 if (LOG.isDebugEnabled()) { 206 LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion); 207 } 208 209 } 210 super.doStart(); 211 212 if (brokerService != null) { 213 // In case the recovered store used a different OpenWire version log a warning 214 // to assist in determining why journal reads fail. 215 if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) { 216 LOG.warn("Recovered Store uses a different OpenWire version[{}] " + 217 "than the version configured[{}].", 218 metadata.openwireVersion, brokerService.getStoreOpenWireVersion()); 219 } 220 } 221 222 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); 223 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); 224 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 225 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 226 this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 227 asyncQueueJobQueue, new ThreadFactory() { 228 @Override 229 public Thread newThread(Runnable runnable) { 230 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); 231 thread.setDaemon(true); 232 return thread; 233 } 234 }); 235 this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 236 asyncTopicJobQueue, new ThreadFactory() { 237 @Override 238 public Thread newThread(Runnable runnable) { 239 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); 240 thread.setDaemon(true); 241 return thread; 242 } 243 }); 244 } 245 246 @Override 247 public void doStop(ServiceStopper stopper) throws Exception { 248 // drain down async jobs 249 LOG.info("Stopping async queue tasks"); 250 if (this.globalQueueSemaphore != null) { 251 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 252 } 253 synchronized (this.asyncQueueMaps) { 254 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) { 255 synchronized (m) { 256 for (StoreTask task : m.values()) { 257 task.cancel(); 258 } 259 } 260 } 261 this.asyncQueueMaps.clear(); 262 } 263 LOG.info("Stopping async topic tasks"); 264 if (this.globalTopicSemaphore != null) { 265 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 266 } 267 synchronized (this.asyncTopicMaps) { 268 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) { 269 synchronized (m) { 270 for (StoreTask task : m.values()) { 271 task.cancel(); 272 } 273 } 274 } 275 this.asyncTopicMaps.clear(); 276 } 277 if (this.globalQueueSemaphore != null) { 278 this.globalQueueSemaphore.drainPermits(); 279 } 280 if (this.globalTopicSemaphore != null) { 281 this.globalTopicSemaphore.drainPermits(); 282 } 283 if (this.queueExecutor != null) { 284 ThreadPoolUtils.shutdownNow(queueExecutor); 285 queueExecutor = null; 286 } 287 if (this.topicExecutor != null) { 288 ThreadPoolUtils.shutdownNow(topicExecutor); 289 topicExecutor = null; 290 } 291 LOG.info("Stopped KahaDB"); 292 super.doStop(stopper); 293 } 294 295 private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { 296 return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { 297 @Override 298 public Location execute(Transaction tx) throws IOException { 299 StoredDestination sd = getStoredDestination(destination, tx); 300 Long sequence = sd.messageIdIndex.get(tx, key); 301 if (sequence == null) { 302 return null; 303 } 304 return sd.orderIndex.get(tx, sequence).location; 305 } 306 }); 307 } 308 309 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { 310 StoreQueueTask task = null; 311 synchronized (store.asyncTaskMap) { 312 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 313 } 314 return task; 315 } 316 317 // with asyncTaskMap locked 318 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { 319 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 320 this.queueExecutor.execute(task); 321 } 322 323 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { 324 StoreTopicTask task = null; 325 synchronized (store.asyncTaskMap) { 326 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 327 } 328 return task; 329 } 330 331 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { 332 synchronized (store.asyncTaskMap) { 333 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 334 } 335 this.topicExecutor.execute(task); 336 } 337 338 @Override 339 public TransactionStore createTransactionStore() throws IOException { 340 return this.transactionStore; 341 } 342 343 public boolean getForceRecoverIndex() { 344 return this.forceRecoverIndex; 345 } 346 347 public void setForceRecoverIndex(boolean forceRecoverIndex) { 348 this.forceRecoverIndex = forceRecoverIndex; 349 } 350 351 public void forgetRecoveredAcks(ArrayList<MessageAck> preparedAcks, boolean isRollback) throws IOException { 352 if (preparedAcks != null) { 353 Map<ActiveMQDestination, KahaDBMessageStore> stores = new HashMap<>(); 354 for (MessageAck ack : preparedAcks) { 355 stores.put(ack.getDestination(), findMatchingStore(ack.getDestination())); 356 } 357 ArrayList<MessageAck> perStoreAcks = new ArrayList<>(); 358 for (Entry<ActiveMQDestination, KahaDBMessageStore> entry : stores.entrySet()) { 359 for (MessageAck ack : preparedAcks) { 360 if (entry.getKey().equals(ack.getDestination())) { 361 perStoreAcks.add(ack); 362 } 363 } 364 entry.getValue().forgetRecoveredAcks(perStoreAcks, isRollback); 365 perStoreAcks.clear(); 366 } 367 } 368 } 369 370 public void trackRecoveredAcks(ArrayList<MessageAck> preparedAcks) throws IOException { 371 Map<ActiveMQDestination, KahaDBMessageStore> stores = new HashMap<>(); 372 for (MessageAck ack : preparedAcks) { 373 stores.put(ack.getDestination(), findMatchingStore(ack.getDestination())); 374 } 375 ArrayList<MessageAck> perStoreAcks = new ArrayList<>(); 376 for (Entry<ActiveMQDestination, KahaDBMessageStore> entry : stores.entrySet()) { 377 for (MessageAck ack : preparedAcks) { 378 if (entry.getKey().equals(ack.getDestination())) { 379 perStoreAcks.add(ack); 380 } 381 } 382 entry.getValue().trackRecoveredAcks(perStoreAcks); 383 perStoreAcks.clear(); 384 } 385 } 386 387 private KahaDBMessageStore findMatchingStore(ActiveMQDestination activeMQDestination) throws IOException { 388 ProxyMessageStore store = (ProxyMessageStore) storeCache.get(convert(activeMQDestination)); 389 if (store == null) { 390 if (activeMQDestination.isQueue()) { 391 store = (ProxyMessageStore) createQueueMessageStore((ActiveMQQueue) activeMQDestination); 392 } else { 393 store = (ProxyMessageStore) createTopicMessageStore((ActiveMQTopic) activeMQDestination); 394 } 395 } 396 return (KahaDBMessageStore) store.getDelegate(); 397 } 398 399 public class KahaDBMessageStore extends AbstractMessageStore { 400 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>(); 401 protected KahaDestination dest; 402 private final int maxAsyncJobs; 403 private final Semaphore localDestinationSemaphore; 404 protected final HashMap<String, Set<String>> ackedAndPreparedMap = new HashMap<String, Set<String>>(); 405 protected final HashMap<String, Set<String>> rolledBackAcksMap = new HashMap<String, Set<String>>(); 406 407 double doneTasks, canceledTasks = 0; 408 409 public KahaDBMessageStore(ActiveMQDestination destination) { 410 super(destination); 411 this.dest = convert(destination); 412 this.maxAsyncJobs = getMaxAsyncJobs(); 413 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); 414 } 415 416 @Override 417 public ActiveMQDestination getDestination() { 418 return destination; 419 } 420 421 422 private final String recoveredTxStateMapKey(ActiveMQDestination destination, MessageAck ack) { 423 return destination.isQueue() ? destination.getPhysicalName() : ack.getConsumerId().getConnectionId(); 424 } 425 426 // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback, 427 // till then they are skipped by the store. 428 // 'at most once' XA guarantee 429 public void trackRecoveredAcks(ArrayList<MessageAck> acks) { 430 indexLock.writeLock().lock(); 431 try { 432 for (MessageAck ack : acks) { 433 final String key = recoveredTxStateMapKey(destination, ack); 434 Set ackedAndPrepared = ackedAndPreparedMap.get(key); 435 if (ackedAndPrepared == null) { 436 ackedAndPrepared = new LinkedHashSet<String>(); 437 ackedAndPreparedMap.put(key, ackedAndPrepared); 438 } 439 ackedAndPrepared.add(ack.getLastMessageId().toProducerKey()); 440 } 441 } finally { 442 indexLock.writeLock().unlock(); 443 } 444 } 445 446 public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException { 447 if (acks != null) { 448 indexLock.writeLock().lock(); 449 try { 450 for (MessageAck ack : acks) { 451 final String id = ack.getLastMessageId().toProducerKey(); 452 final String key = recoveredTxStateMapKey(destination, ack); 453 Set ackedAndPrepared = ackedAndPreparedMap.get(key); 454 if (ackedAndPrepared != null) { 455 ackedAndPrepared.remove(id); 456 if (ackedAndPreparedMap.isEmpty()) { 457 ackedAndPreparedMap.remove(key); 458 } 459 } 460 if (rollback) { 461 Set rolledBackAcks = rolledBackAcksMap.get(key); 462 if (rolledBackAcks == null) { 463 rolledBackAcks = new LinkedHashSet<String>(); 464 rolledBackAcksMap.put(key, rolledBackAcks); 465 } 466 rolledBackAcks.add(id); 467 //incrementAndAddSizeToStoreStat(dest, 0); 468 } 469 } 470 } finally { 471 indexLock.writeLock().unlock(); 472 } 473 } 474 } 475 476 @Override 477 public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) 478 throws IOException { 479 if (isConcurrentStoreAndDispatchQueues()) { 480 message.beforeMarshall(wireFormat); 481 StoreQueueTask result = new StoreQueueTask(this, context, message); 482 ListenableFuture<Object> future = result.getFuture(); 483 message.getMessageId().setFutureOrSequenceLong(future); 484 message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch 485 result.aquireLocks(); 486 synchronized (asyncTaskMap) { 487 addQueueTask(this, result); 488 if (indexListener != null) { 489 indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); 490 } 491 } 492 return future; 493 } else { 494 return super.asyncAddQueueMessage(context, message); 495 } 496 } 497 498 @Override 499 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 500 if (isConcurrentStoreAndDispatchQueues()) { 501 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination()); 502 StoreQueueTask task = null; 503 synchronized (asyncTaskMap) { 504 task = (StoreQueueTask) asyncTaskMap.get(key); 505 } 506 if (task != null) { 507 if (ack.isInTransaction() || !task.cancel()) { 508 try { 509 task.future.get(); 510 } catch (InterruptedException e) { 511 throw new InterruptedIOException(e.toString()); 512 } catch (Exception ignored) { 513 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored); 514 } 515 removeMessage(context, ack); 516 } else { 517 indexLock.writeLock().lock(); 518 try { 519 metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId()); 520 } finally { 521 indexLock.writeLock().unlock(); 522 } 523 synchronized (asyncTaskMap) { 524 asyncTaskMap.remove(key); 525 } 526 } 527 } else { 528 removeMessage(context, ack); 529 } 530 } else { 531 removeMessage(context, ack); 532 } 533 } 534 535 @Override 536 public void addMessage(final ConnectionContext context, final Message message) throws IOException { 537 final KahaAddMessageCommand command = new KahaAddMessageCommand(); 538 command.setDestination(dest); 539 command.setMessageId(message.getMessageId().toProducerKey()); 540 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId()))); 541 command.setPriority(message.getPriority()); 542 command.setPrioritySupported(isPrioritizedMessages()); 543 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 544 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 545 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() { 546 // sync add? (for async, future present from getFutureOrSequenceLong) 547 Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); 548 549 @Override 550 public void sequenceAssignedWithIndexLocked(final long sequence) { 551 message.getMessageId().setFutureOrSequenceLong(sequence); 552 if (indexListener != null) { 553 if (possibleFuture == null) { 554 trackPendingAdd(dest, sequence); 555 indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { 556 @Override 557 public void run() { 558 trackPendingAddComplete(dest, sequence); 559 } 560 })); 561 } 562 } 563 } 564 }, null); 565 } 566 567 @Override 568 public void updateMessage(Message message) throws IOException { 569 if (LOG.isTraceEnabled()) { 570 LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); 571 } 572 KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand(); 573 KahaAddMessageCommand command = new KahaAddMessageCommand(); 574 command.setDestination(dest); 575 command.setMessageId(message.getMessageId().toProducerKey()); 576 command.setPriority(message.getPriority()); 577 command.setPrioritySupported(prioritizedMessages); 578 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 579 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 580 updateMessageCommand.setMessage(command); 581 store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null); 582 } 583 584 @Override 585 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 586 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 587 command.setDestination(dest); 588 command.setMessageId(ack.getLastMessageId().toProducerKey()); 589 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId()))); 590 591 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 592 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 593 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); 594 } 595 596 @Override 597 public void removeAllMessages(ConnectionContext context) throws IOException { 598 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 599 command.setDestination(dest); 600 store(command, true, null, null); 601 } 602 603 @Override 604 public Message getMessage(MessageId identity) throws IOException { 605 final String key = identity.toProducerKey(); 606 607 // Hopefully one day the page file supports concurrent read 608 // operations... but for now we must 609 // externally synchronize... 610 Location location; 611 indexLock.writeLock().lock(); 612 try { 613 location = findMessageLocation(key, dest); 614 } finally { 615 indexLock.writeLock().unlock(); 616 } 617 if (location == null) { 618 return null; 619 } 620 621 return loadMessage(location); 622 } 623 624 @Override 625 public int getMessageCount() throws IOException { 626 try { 627 lockAsyncJobQueue(); 628 indexLock.writeLock().lock(); 629 try { 630 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 631 @Override 632 public Integer execute(Transaction tx) throws IOException { 633 // Iterate through all index entries to get a count 634 // of messages in the destination. 635 StoredDestination sd = getStoredDestination(dest, tx); 636 int rc = 0; 637 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { 638 iterator.next(); 639 rc++; 640 } 641 Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); 642 if (ackedAndPrepared != null) { 643 rc = rc - ackedAndPrepared.size(); 644 } 645 return rc; 646 } 647 }); 648 } finally { 649 indexLock.writeLock().unlock(); 650 } 651 } finally { 652 unlockAsyncJobQueue(); 653 } 654 } 655 656 @Override 657 public boolean isEmpty() throws IOException { 658 indexLock.writeLock().lock(); 659 try { 660 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() { 661 @Override 662 public Boolean execute(Transaction tx) throws IOException { 663 // Iterate through all index entries to get a count of 664 // messages in the destination. 665 StoredDestination sd = getStoredDestination(dest, tx); 666 return sd.locationIndex.isEmpty(tx); 667 } 668 }); 669 } finally { 670 indexLock.writeLock().unlock(); 671 } 672 } 673 674 @Override 675 public void recover(final MessageRecoveryListener listener) throws Exception { 676 // recovery may involve expiry which will modify 677 indexLock.writeLock().lock(); 678 try { 679 pageFile.tx().execute(new Transaction.Closure<Exception>() { 680 @Override 681 public void execute(Transaction tx) throws Exception { 682 StoredDestination sd = getStoredDestination(dest, tx); 683 recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, Integer.MAX_VALUE, listener); 684 sd.orderIndex.resetCursorPosition(); 685 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator 686 .hasNext(); ) { 687 Entry<Long, MessageKeys> entry = iterator.next(); 688 Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); 689 if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { 690 continue; 691 } 692 Message msg = loadMessage(entry.getValue().location); 693 listener.recoverMessage(msg); 694 } 695 } 696 }); 697 } finally { 698 indexLock.writeLock().unlock(); 699 } 700 } 701 702 @Override 703 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 704 indexLock.writeLock().lock(); 705 try { 706 pageFile.tx().execute(new Transaction.Closure<Exception>() { 707 @Override 708 public void execute(Transaction tx) throws Exception { 709 StoredDestination sd = getStoredDestination(dest, tx); 710 Entry<Long, MessageKeys> entry = null; 711 int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener); 712 Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); 713 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { 714 entry = iterator.next(); 715 if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { 716 continue; 717 } 718 Message msg = loadMessage(entry.getValue().location); 719 msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); 720 listener.recoverMessage(msg); 721 counter++; 722 if (counter >= maxReturned || !listener.canRecoveryNextMessage()) { 723 break; 724 } 725 } 726 sd.orderIndex.stoppedIterating(); 727 } 728 }); 729 } finally { 730 indexLock.writeLock().unlock(); 731 } 732 } 733 734 protected int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { 735 int counter = 0; 736 String id; 737 738 Set rolledBackAcks = rolledBackAcksMap.get(recoveredTxStateMapKey); 739 if (rolledBackAcks == null) { 740 return counter; 741 } 742 for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) { 743 id = iterator.next(); 744 iterator.remove(); 745 Long sequence = sd.messageIdIndex.get(tx, id); 746 if (sequence != null) { 747 if (sd.orderIndex.alreadyDispatched(sequence)) { 748 listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location)); 749 counter++; 750 if (counter >= maxReturned) { 751 break; 752 } 753 } else { 754 LOG.debug("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor); 755 } 756 } else { 757 LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd); 758 } 759 } 760 if (rolledBackAcks.isEmpty()) { 761 rolledBackAcksMap.remove(recoveredTxStateMapKey); 762 } 763 return counter; 764 } 765 766 767 @Override 768 public void resetBatching() { 769 if (pageFile.isLoaded()) { 770 indexLock.writeLock().lock(); 771 try { 772 pageFile.tx().execute(new Transaction.Closure<Exception>() { 773 @Override 774 public void execute(Transaction tx) throws Exception { 775 StoredDestination sd = getExistingStoredDestination(dest, tx); 776 if (sd != null) { 777 sd.orderIndex.resetCursorPosition();} 778 } 779 }); 780 } catch (Exception e) { 781 LOG.error("Failed to reset batching",e); 782 } finally { 783 indexLock.writeLock().unlock(); 784 } 785 } 786 } 787 788 @Override 789 public void setBatch(final MessageId identity) throws IOException { 790 indexLock.writeLock().lock(); 791 try { 792 pageFile.tx().execute(new Transaction.Closure<IOException>() { 793 @Override 794 public void execute(Transaction tx) throws IOException { 795 StoredDestination sd = getStoredDestination(dest, tx); 796 Long location = (Long) identity.getFutureOrSequenceLong(); 797 Long pending = sd.orderIndex.minPendingAdd(); 798 if (pending != null) { 799 location = Math.min(location, pending-1); 800 } 801 sd.orderIndex.setBatch(tx, location); 802 } 803 }); 804 } finally { 805 indexLock.writeLock().unlock(); 806 } 807 } 808 809 @Override 810 public void setMemoryUsage(MemoryUsage memoryUsage) { 811 } 812 @Override 813 public void start() throws Exception { 814 super.start(); 815 // exercise the store to ensure creation 816 isEmpty(); 817 } 818 @Override 819 public void stop() throws Exception { 820 super.stop(); 821 } 822 823 protected void lockAsyncJobQueue() { 824 try { 825 if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) { 826 throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore); 827 } 828 } catch (Exception e) { 829 LOG.error("Failed to lock async jobs for " + this.destination, e); 830 } 831 } 832 833 protected void unlockAsyncJobQueue() { 834 this.localDestinationSemaphore.release(this.maxAsyncJobs); 835 } 836 837 protected void acquireLocalAsyncLock() { 838 try { 839 this.localDestinationSemaphore.acquire(); 840 } catch (InterruptedException e) { 841 LOG.error("Failed to aquire async lock for " + this.destination, e); 842 } 843 } 844 845 protected void releaseLocalAsyncLock() { 846 this.localDestinationSemaphore.release(); 847 } 848 849 @Override 850 public String toString(){ 851 return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest)); 852 } 853 } 854 855 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 856 private final AtomicInteger subscriptionCount = new AtomicInteger(); 857 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { 858 super(destination); 859 this.subscriptionCount.set(getAllSubscriptions().length); 860 if (isConcurrentStoreAndDispatchTopics()) { 861 asyncTopicMaps.add(asyncTaskMap); 862 } 863 } 864 865 @Override 866 public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) 867 throws IOException { 868 if (isConcurrentStoreAndDispatchTopics()) { 869 message.beforeMarshall(wireFormat); 870 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); 871 result.aquireLocks(); 872 addTopicTask(this, result); 873 return result.getFuture(); 874 } else { 875 return super.asyncAddTopicMessage(context, message); 876 } 877 } 878 879 @Override 880 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 881 MessageId messageId, MessageAck ack) throws IOException { 882 String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); 883 if (isConcurrentStoreAndDispatchTopics()) { 884 AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); 885 StoreTopicTask task = null; 886 synchronized (asyncTaskMap) { 887 task = (StoreTopicTask) asyncTaskMap.get(key); 888 } 889 if (task != null) { 890 if (task.addSubscriptionKey(subscriptionKey)) { 891 removeTopicTask(this, messageId); 892 if (task.cancel()) { 893 synchronized (asyncTaskMap) { 894 asyncTaskMap.remove(key); 895 } 896 } 897 } 898 } else { 899 doAcknowledge(context, subscriptionKey, messageId, ack); 900 } 901 } else { 902 doAcknowledge(context, subscriptionKey, messageId, ack); 903 } 904 } 905 906 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) 907 throws IOException { 908 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 909 command.setDestination(dest); 910 command.setSubscriptionKey(subscriptionKey); 911 command.setMessageId(messageId.toProducerKey()); 912 command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null); 913 if (ack != null && ack.isUnmatchedAck()) { 914 command.setAck(UNMATCHED); 915 } else { 916 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 917 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 918 } 919 store(command, false, null, null); 920 } 921 922 @Override 923 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 924 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo 925 .getSubscriptionName()); 926 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 927 command.setDestination(dest); 928 command.setSubscriptionKey(subscriptionKey.toString()); 929 command.setRetroactive(retroactive); 930 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 931 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 932 store(command, isEnableJournalDiskSyncs() && true, null, null); 933 this.subscriptionCount.incrementAndGet(); 934 } 935 936 @Override 937 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 938 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 939 command.setDestination(dest); 940 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString()); 941 store(command, isEnableJournalDiskSyncs() && true, null, null); 942 this.subscriptionCount.decrementAndGet(); 943 } 944 945 @Override 946 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 947 948 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 949 indexLock.writeLock().lock(); 950 try { 951 pageFile.tx().execute(new Transaction.Closure<IOException>() { 952 @Override 953 public void execute(Transaction tx) throws IOException { 954 StoredDestination sd = getStoredDestination(dest, tx); 955 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator 956 .hasNext();) { 957 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 958 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry 959 .getValue().getSubscriptionInfo().newInput())); 960 subscriptions.add(info); 961 962 } 963 } 964 }); 965 } finally { 966 indexLock.writeLock().unlock(); 967 } 968 969 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; 970 subscriptions.toArray(rc); 971 return rc; 972 } 973 974 @Override 975 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 976 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 977 indexLock.writeLock().lock(); 978 try { 979 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { 980 @Override 981 public SubscriptionInfo execute(Transaction tx) throws IOException { 982 StoredDestination sd = getStoredDestination(dest, tx); 983 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 984 if (command == null) { 985 return null; 986 } 987 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command 988 .getSubscriptionInfo().newInput())); 989 } 990 }); 991 } finally { 992 indexLock.writeLock().unlock(); 993 } 994 } 995 996 @Override 997 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 998 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 999 indexLock.writeLock().lock(); 1000 try { 1001 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 1002 @Override 1003 public Integer execute(Transaction tx) throws IOException { 1004 StoredDestination sd = getStoredDestination(dest, tx); 1005 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 1006 if (cursorPos == null) { 1007 // The subscription might not exist. 1008 return 0; 1009 } 1010 1011 return (int) getStoredMessageCount(tx, sd, subscriptionKey); 1012 } 1013 }); 1014 } finally { 1015 indexLock.writeLock().unlock(); 1016 } 1017 } 1018 1019 @Override 1020 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) 1021 throws Exception { 1022 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1023 @SuppressWarnings("unused") 1024 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 1025 indexLock.writeLock().lock(); 1026 try { 1027 pageFile.tx().execute(new Transaction.Closure<Exception>() { 1028 @Override 1029 public void execute(Transaction tx) throws Exception { 1030 StoredDestination sd = getStoredDestination(dest, tx); 1031 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 1032 SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey); 1033 //If we have ackPositions tracked then compare the first one as individual acknowledge mode 1034 //may have bumped lastAck even though there are earlier messages to still consume 1035 if (subAckPositions != null && !subAckPositions.isEmpty() 1036 && subAckPositions.getHead().getFirst() < cursorPos.lastAckedSequence) { 1037 //we have messages to ack before lastAckedSequence 1038 sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1); 1039 } else { 1040 subAckPositions = null; 1041 sd.orderIndex.setBatch(tx, cursorPos); 1042 } 1043 recoverRolledBackAcks(subscriptionKey, sd, tx, Integer.MAX_VALUE, listener); 1044 Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey); 1045 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator 1046 .hasNext();) { 1047 Entry<Long, MessageKeys> entry = iterator.next(); 1048 if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { 1049 continue; 1050 } 1051 //If subAckPositions is set then verify the sequence set contains the message still 1052 //and if it doesn't skip it 1053 if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) { 1054 continue; 1055 } 1056 listener.recoverMessage(loadMessage(entry.getValue().location)); 1057 } 1058 sd.orderIndex.resetCursorPosition(); 1059 } 1060 }); 1061 } finally { 1062 indexLock.writeLock().unlock(); 1063 } 1064 } 1065 1066 @Override 1067 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, 1068 final MessageRecoveryListener listener) throws Exception { 1069 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1070 @SuppressWarnings("unused") 1071 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 1072 indexLock.writeLock().lock(); 1073 try { 1074 pageFile.tx().execute(new Transaction.Closure<Exception>() { 1075 @Override 1076 public void execute(Transaction tx) throws Exception { 1077 StoredDestination sd = getStoredDestination(dest, tx); 1078 sd.orderIndex.resetCursorPosition(); 1079 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); 1080 SequenceSet subAckPositions = null; 1081 if (moc == null) { 1082 LastAck pos = getLastAck(tx, sd, subscriptionKey); 1083 if (pos == null) { 1084 // sub deleted 1085 return; 1086 } 1087 subAckPositions = getSequenceSet(tx, sd, subscriptionKey); 1088 //If we have ackPositions tracked then compare the first one as individual acknowledge mode 1089 //may have bumped lastAck even though there are earlier messages to still consume 1090 if (subAckPositions != null && !subAckPositions.isEmpty() 1091 && subAckPositions.getHead().getFirst() < pos.lastAckedSequence) { 1092 //we have messages to ack before lastAckedSequence 1093 sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1); 1094 } else { 1095 subAckPositions = null; 1096 sd.orderIndex.setBatch(tx, pos); 1097 } 1098 moc = sd.orderIndex.cursor; 1099 } else { 1100 sd.orderIndex.cursor.sync(moc); 1101 } 1102 1103 Entry<Long, MessageKeys> entry = null; 1104 int counter = recoverRolledBackAcks(subscriptionKey, sd, tx, maxReturned, listener); 1105 Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey); 1106 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator 1107 .hasNext();) { 1108 entry = iterator.next(); 1109 if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { 1110 continue; 1111 } 1112 //If subAckPositions is set then verify the sequence set contains the message still 1113 //and if it doesn't skip it 1114 if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) { 1115 continue; 1116 } 1117 if (listener.recoverMessage(loadMessage(entry.getValue().location))) { 1118 counter++; 1119 } 1120 if (counter >= maxReturned || listener.hasSpace() == false) { 1121 break; 1122 } 1123 } 1124 sd.orderIndex.stoppedIterating(); 1125 if (entry != null) { 1126 MessageOrderCursor copy = sd.orderIndex.cursor.copy(); 1127 sd.subscriptionCursors.put(subscriptionKey, copy); 1128 } 1129 } 1130 }); 1131 } finally { 1132 indexLock.writeLock().unlock(); 1133 } 1134 } 1135 1136 @Override 1137 public void resetBatching(String clientId, String subscriptionName) { 1138 try { 1139 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1140 indexLock.writeLock().lock(); 1141 try { 1142 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1143 @Override 1144 public void execute(Transaction tx) throws IOException { 1145 StoredDestination sd = getStoredDestination(dest, tx); 1146 sd.subscriptionCursors.remove(subscriptionKey); 1147 } 1148 }); 1149 }finally { 1150 indexLock.writeLock().unlock(); 1151 } 1152 } catch (IOException e) { 1153 throw new RuntimeException(e); 1154 } 1155 } 1156 } 1157 1158 String subscriptionKey(String clientId, String subscriptionName) { 1159 return clientId + ":" + subscriptionName; 1160 } 1161 1162 @Override 1163 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 1164 String key = key(convert(destination)); 1165 MessageStore store = storeCache.get(key(convert(destination))); 1166 if (store == null) { 1167 final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination)); 1168 synchronized (storeCache) { 1169 store = storeCache.put(key, queueStore); 1170 if (store != null) { 1171 storeCache.put(key, store); 1172 } else { 1173 store = queueStore; 1174 } 1175 } 1176 } 1177 1178 return store; 1179 } 1180 1181 @Override 1182 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 1183 String key = key(convert(destination)); 1184 MessageStore store = storeCache.get(key(convert(destination))); 1185 if (store == null) { 1186 final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); 1187 synchronized (storeCache) { 1188 store = storeCache.put(key, topicStore); 1189 if (store != null) { 1190 storeCache.put(key, store); 1191 } else { 1192 store = topicStore; 1193 } 1194 } 1195 } 1196 1197 return (TopicMessageStore) store; 1198 } 1199 1200 /** 1201 * Cleanup method to remove any state associated with the given destination. 1202 * This method does not stop the message store (it might not be cached). 1203 * 1204 * @param destination 1205 * Destination to forget 1206 */ 1207 @Override 1208 public void removeQueueMessageStore(ActiveMQQueue destination) { 1209 } 1210 1211 /** 1212 * Cleanup method to remove any state associated with the given destination 1213 * This method does not stop the message store (it might not be cached). 1214 * 1215 * @param destination 1216 * Destination to forget 1217 */ 1218 @Override 1219 public void removeTopicMessageStore(ActiveMQTopic destination) { 1220 } 1221 1222 @Override 1223 public void deleteAllMessages() throws IOException { 1224 deleteAllMessages = true; 1225 } 1226 1227 @Override 1228 public Set<ActiveMQDestination> getDestinations() { 1229 try { 1230 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 1231 indexLock.writeLock().lock(); 1232 try { 1233 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1234 @Override 1235 public void execute(Transaction tx) throws IOException { 1236 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator 1237 .hasNext();) { 1238 Entry<String, StoredDestination> entry = iterator.next(); 1239 //Removing isEmpty topic check - see AMQ-5875 1240 rc.add(convert(entry.getKey())); 1241 } 1242 } 1243 }); 1244 }finally { 1245 indexLock.writeLock().unlock(); 1246 } 1247 return rc; 1248 } catch (IOException e) { 1249 throw new RuntimeException(e); 1250 } 1251 } 1252 1253 @Override 1254 public long getLastMessageBrokerSequenceId() throws IOException { 1255 return 0; 1256 } 1257 1258 @Override 1259 public long getLastProducerSequenceId(ProducerId id) { 1260 indexLock.writeLock().lock(); 1261 try { 1262 return metadata.producerSequenceIdTracker.getLastSeqId(id); 1263 } finally { 1264 indexLock.writeLock().unlock(); 1265 } 1266 } 1267 1268 @Override 1269 public long size() { 1270 try { 1271 return journalSize.get() + getPageFile().getDiskSize(); 1272 } catch (IOException e) { 1273 throw new RuntimeException(e); 1274 } 1275 } 1276 1277 @Override 1278 public void beginTransaction(ConnectionContext context) throws IOException { 1279 throw new IOException("Not yet implemented."); 1280 } 1281 @Override 1282 public void commitTransaction(ConnectionContext context) throws IOException { 1283 throw new IOException("Not yet implemented."); 1284 } 1285 @Override 1286 public void rollbackTransaction(ConnectionContext context) throws IOException { 1287 throw new IOException("Not yet implemented."); 1288 } 1289 1290 @Override 1291 public void checkpoint(boolean sync) throws IOException { 1292 super.checkpointCleanup(sync); 1293 } 1294 1295 // ///////////////////////////////////////////////////////////////// 1296 // Internal helper methods. 1297 // ///////////////////////////////////////////////////////////////// 1298 1299 /** 1300 * @param location 1301 * @return 1302 * @throws IOException 1303 */ 1304 Message loadMessage(Location location) throws IOException { 1305 try { 1306 JournalCommand<?> command = load(location); 1307 KahaAddMessageCommand addMessage = null; 1308 switch (command.type()) { 1309 case KAHA_UPDATE_MESSAGE_COMMAND: 1310 addMessage = ((KahaUpdateMessageCommand) command).getMessage(); 1311 break; 1312 case KAHA_ADD_MESSAGE_COMMAND: 1313 addMessage = (KahaAddMessageCommand) command; 1314 break; 1315 default: 1316 throw new IOException("Could not load journal record, unexpected command type: " + command.type() + " at location: " + location); 1317 } 1318 if (!addMessage.hasMessage()) { 1319 throw new IOException("Could not load journal record, null message content at location: " + location); 1320 } 1321 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); 1322 return msg; 1323 } catch (Throwable t) { 1324 IOException ioe = IOExceptionSupport.create("Unexpected error on journal read at: " + location , t); 1325 LOG.error("Failed to load message at: {}", location , ioe); 1326 brokerService.handleIOException(ioe); 1327 throw ioe; 1328 } 1329 } 1330 1331 // ///////////////////////////////////////////////////////////////// 1332 // Internal conversion methods. 1333 // ///////////////////////////////////////////////////////////////// 1334 1335 KahaLocation convert(Location location) { 1336 KahaLocation rc = new KahaLocation(); 1337 rc.setLogId(location.getDataFileId()); 1338 rc.setOffset(location.getOffset()); 1339 return rc; 1340 } 1341 1342 KahaDestination convert(ActiveMQDestination dest) { 1343 KahaDestination rc = new KahaDestination(); 1344 rc.setName(dest.getPhysicalName()); 1345 switch (dest.getDestinationType()) { 1346 case ActiveMQDestination.QUEUE_TYPE: 1347 rc.setType(DestinationType.QUEUE); 1348 return rc; 1349 case ActiveMQDestination.TOPIC_TYPE: 1350 rc.setType(DestinationType.TOPIC); 1351 return rc; 1352 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1353 rc.setType(DestinationType.TEMP_QUEUE); 1354 return rc; 1355 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1356 rc.setType(DestinationType.TEMP_TOPIC); 1357 return rc; 1358 default: 1359 return null; 1360 } 1361 } 1362 1363 ActiveMQDestination convert(String dest) { 1364 int p = dest.indexOf(":"); 1365 if (p < 0) { 1366 throw new IllegalArgumentException("Not in the valid destination format"); 1367 } 1368 int type = Integer.parseInt(dest.substring(0, p)); 1369 String name = dest.substring(p + 1); 1370 return convert(type, name); 1371 } 1372 1373 private ActiveMQDestination convert(KahaDestination commandDestination) { 1374 return convert(commandDestination.getType().getNumber(), commandDestination.getName()); 1375 } 1376 1377 private ActiveMQDestination convert(int type, String name) { 1378 switch (KahaDestination.DestinationType.valueOf(type)) { 1379 case QUEUE: 1380 return new ActiveMQQueue(name); 1381 case TOPIC: 1382 return new ActiveMQTopic(name); 1383 case TEMP_QUEUE: 1384 return new ActiveMQTempQueue(name); 1385 case TEMP_TOPIC: 1386 return new ActiveMQTempTopic(name); 1387 default: 1388 throw new IllegalArgumentException("Not in the valid destination format"); 1389 } 1390 } 1391 1392 public TransactionIdTransformer getTransactionIdTransformer() { 1393 return transactionIdTransformer; 1394 } 1395 1396 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 1397 this.transactionIdTransformer = transactionIdTransformer; 1398 } 1399 1400 static class AsyncJobKey { 1401 MessageId id; 1402 ActiveMQDestination destination; 1403 1404 AsyncJobKey(MessageId id, ActiveMQDestination destination) { 1405 this.id = id; 1406 this.destination = destination; 1407 } 1408 1409 @Override 1410 public boolean equals(Object obj) { 1411 if (obj == this) { 1412 return true; 1413 } 1414 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id) 1415 && destination.equals(((AsyncJobKey) obj).destination); 1416 } 1417 1418 @Override 1419 public int hashCode() { 1420 return id.hashCode() + destination.hashCode(); 1421 } 1422 1423 @Override 1424 public String toString() { 1425 return destination.getPhysicalName() + "-" + id; 1426 } 1427 } 1428 1429 public interface StoreTask { 1430 public boolean cancel(); 1431 1432 public void aquireLocks(); 1433 1434 public void releaseLocks(); 1435 } 1436 1437 class StoreQueueTask implements Runnable, StoreTask { 1438 protected final Message message; 1439 protected final ConnectionContext context; 1440 protected final KahaDBMessageStore store; 1441 protected final InnerFutureTask future; 1442 protected final AtomicBoolean done = new AtomicBoolean(); 1443 protected final AtomicBoolean locked = new AtomicBoolean(); 1444 1445 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) { 1446 this.store = store; 1447 this.context = context; 1448 this.message = message; 1449 this.future = new InnerFutureTask(this); 1450 } 1451 1452 public ListenableFuture<Object> getFuture() { 1453 return this.future; 1454 } 1455 1456 @Override 1457 public boolean cancel() { 1458 if (this.done.compareAndSet(false, true)) { 1459 return this.future.cancel(false); 1460 } 1461 return false; 1462 } 1463 1464 @Override 1465 public void aquireLocks() { 1466 if (this.locked.compareAndSet(false, true)) { 1467 try { 1468 globalQueueSemaphore.acquire(); 1469 store.acquireLocalAsyncLock(); 1470 message.incrementReferenceCount(); 1471 } catch (InterruptedException e) { 1472 LOG.warn("Failed to aquire lock", e); 1473 } 1474 } 1475 1476 } 1477 1478 @Override 1479 public void releaseLocks() { 1480 if (this.locked.compareAndSet(true, false)) { 1481 store.releaseLocalAsyncLock(); 1482 globalQueueSemaphore.release(); 1483 message.decrementReferenceCount(); 1484 } 1485 } 1486 1487 @Override 1488 public void run() { 1489 this.store.doneTasks++; 1490 try { 1491 if (this.done.compareAndSet(false, true)) { 1492 this.store.addMessage(context, message); 1493 removeQueueTask(this.store, this.message.getMessageId()); 1494 this.future.complete(); 1495 } else if (cancelledTaskModMetric > 0 && (++this.store.canceledTasks) % cancelledTaskModMetric == 0) { 1496 System.err.println(this.store.dest.getName() + " cancelled: " 1497 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1498 this.store.canceledTasks = this.store.doneTasks = 0; 1499 } 1500 } catch (Throwable t) { 1501 this.future.setException(t); 1502 removeQueueTask(this.store, this.message.getMessageId()); 1503 } 1504 } 1505 1506 protected Message getMessage() { 1507 return this.message; 1508 } 1509 1510 private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> { 1511 1512 private final AtomicReference<Runnable> listenerRef = new AtomicReference<>(); 1513 1514 public InnerFutureTask(Runnable runnable) { 1515 super(runnable, null); 1516 } 1517 1518 @Override 1519 public void setException(final Throwable e) { 1520 super.setException(e); 1521 } 1522 1523 public void complete() { 1524 super.set(null); 1525 } 1526 1527 @Override 1528 public void done() { 1529 fireListener(); 1530 } 1531 1532 @Override 1533 public void addListener(Runnable listener) { 1534 this.listenerRef.set(listener); 1535 if (isDone()) { 1536 fireListener(); 1537 } 1538 } 1539 1540 private void fireListener() { 1541 Runnable listener = listenerRef.getAndSet(null); 1542 if (listener != null) { 1543 try { 1544 listener.run(); 1545 } catch (Exception ignored) { 1546 LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored); 1547 } 1548 } 1549 } 1550 } 1551 } 1552 1553 class StoreTopicTask extends StoreQueueTask { 1554 private final int subscriptionCount; 1555 private final List<String> subscriptionKeys = new ArrayList<String>(1); 1556 private final KahaDBTopicMessageStore topicStore; 1557 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, 1558 int subscriptionCount) { 1559 super(store, context, message); 1560 this.topicStore = store; 1561 this.subscriptionCount = subscriptionCount; 1562 1563 } 1564 1565 @Override 1566 public void aquireLocks() { 1567 if (this.locked.compareAndSet(false, true)) { 1568 try { 1569 globalTopicSemaphore.acquire(); 1570 store.acquireLocalAsyncLock(); 1571 message.incrementReferenceCount(); 1572 } catch (InterruptedException e) { 1573 LOG.warn("Failed to aquire lock", e); 1574 } 1575 } 1576 } 1577 1578 @Override 1579 public void releaseLocks() { 1580 if (this.locked.compareAndSet(true, false)) { 1581 message.decrementReferenceCount(); 1582 store.releaseLocalAsyncLock(); 1583 globalTopicSemaphore.release(); 1584 } 1585 } 1586 1587 /** 1588 * add a key 1589 * 1590 * @param key 1591 * @return true if all acknowledgements received 1592 */ 1593 public boolean addSubscriptionKey(String key) { 1594 synchronized (this.subscriptionKeys) { 1595 this.subscriptionKeys.add(key); 1596 } 1597 return this.subscriptionKeys.size() >= this.subscriptionCount; 1598 } 1599 1600 @Override 1601 public void run() { 1602 this.store.doneTasks++; 1603 try { 1604 if (this.done.compareAndSet(false, true)) { 1605 this.topicStore.addMessage(context, message); 1606 // apply any acks we have 1607 synchronized (this.subscriptionKeys) { 1608 for (String key : this.subscriptionKeys) { 1609 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); 1610 1611 } 1612 } 1613 removeTopicTask(this.topicStore, this.message.getMessageId()); 1614 this.future.complete(); 1615 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1616 System.err.println(this.store.dest.getName() + " cancelled: " 1617 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1618 this.store.canceledTasks = this.store.doneTasks = 0; 1619 } 1620 } catch (Throwable t) { 1621 this.future.setException(t); 1622 removeTopicTask(this.topicStore, this.message.getMessageId()); 1623 } 1624 } 1625 } 1626 1627 public class StoreTaskExecutor extends ThreadPoolExecutor { 1628 1629 public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) { 1630 super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory); 1631 } 1632 1633 @Override 1634 protected void afterExecute(Runnable runnable, Throwable throwable) { 1635 super.afterExecute(runnable, throwable); 1636 1637 if (runnable instanceof StoreTask) { 1638 ((StoreTask)runnable).releaseLocks(); 1639 } 1640 } 1641 } 1642 1643 @Override 1644 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 1645 return new JobSchedulerStoreImpl(); 1646 } 1647}