001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.Set; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.FutureTask; 034import java.util.concurrent.LinkedBlockingQueue; 035import java.util.concurrent.Semaphore; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicInteger; 042import java.util.concurrent.atomic.AtomicReference; 043 044import org.apache.activemq.broker.ConnectionContext; 045import org.apache.activemq.broker.region.BaseDestination; 046import org.apache.activemq.broker.scheduler.JobSchedulerStore; 047import org.apache.activemq.command.ActiveMQDestination; 048import org.apache.activemq.command.ActiveMQQueue; 049import org.apache.activemq.command.ActiveMQTempQueue; 050import org.apache.activemq.command.ActiveMQTempTopic; 051import org.apache.activemq.command.ActiveMQTopic; 052import org.apache.activemq.command.Message; 053import org.apache.activemq.command.MessageAck; 054import org.apache.activemq.command.MessageId; 055import org.apache.activemq.command.ProducerId; 056import org.apache.activemq.command.SubscriptionInfo; 057import org.apache.activemq.command.TransactionId; 058import org.apache.activemq.openwire.OpenWireFormat; 059import org.apache.activemq.protobuf.Buffer; 060import org.apache.activemq.store.AbstractMessageStore; 061import org.apache.activemq.store.IndexListener; 062import org.apache.activemq.store.ListenableFuture; 063import org.apache.activemq.store.MessageRecoveryListener; 064import org.apache.activemq.store.MessageStore; 065import org.apache.activemq.store.PersistenceAdapter; 066import org.apache.activemq.store.TopicMessageStore; 067import org.apache.activemq.store.TransactionIdTransformer; 068import org.apache.activemq.store.TransactionStore; 069import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 070import org.apache.activemq.store.kahadb.data.KahaDestination; 071import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 072import org.apache.activemq.store.kahadb.data.KahaLocation; 073import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 074import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 075import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 076import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 077import org.apache.activemq.store.kahadb.disk.journal.Location; 078import org.apache.activemq.store.kahadb.disk.page.Transaction; 079import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; 080import org.apache.activemq.usage.MemoryUsage; 081import org.apache.activemq.usage.SystemUsage; 082import org.apache.activemq.util.ServiceStopper; 083import org.apache.activemq.util.ThreadPoolUtils; 084import org.apache.activemq.wireformat.WireFormat; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087 088public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { 089 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); 090 private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH; 091 092 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC"; 093 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty( 094 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10); 095 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS"; 096 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty( 097 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);; 098 099 protected ExecutorService queueExecutor; 100 protected ExecutorService topicExecutor; 101 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 102 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 103 final WireFormat wireFormat = new OpenWireFormat(); 104 private SystemUsage usageManager; 105 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue; 106 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue; 107 Semaphore globalQueueSemaphore; 108 Semaphore globalTopicSemaphore; 109 private boolean concurrentStoreAndDispatchQueues = true; 110 // when true, message order may be compromised when cache is exhausted if store is out 111 // or order w.r.t cache 112 private boolean concurrentStoreAndDispatchTopics = false; 113 private final boolean concurrentStoreAndDispatchTransactions = false; 114 private int maxAsyncJobs = MAX_ASYNC_JOBS; 115 private final KahaDBTransactionStore transactionStore; 116 private TransactionIdTransformer transactionIdTransformer; 117 118 public KahaDBStore() { 119 this.transactionStore = new KahaDBTransactionStore(this); 120 this.transactionIdTransformer = new TransactionIdTransformer() { 121 @Override 122 public TransactionId transform(TransactionId txid) { 123 return txid; 124 } 125 }; 126 } 127 128 @Override 129 public String toString() { 130 return "KahaDB:[" + directory.getAbsolutePath() + "]"; 131 } 132 133 @Override 134 public void setBrokerName(String brokerName) { 135 } 136 137 @Override 138 public void setUsageManager(SystemUsage usageManager) { 139 this.usageManager = usageManager; 140 } 141 142 public SystemUsage getUsageManager() { 143 return this.usageManager; 144 } 145 146 /** 147 * @return the concurrentStoreAndDispatch 148 */ 149 public boolean isConcurrentStoreAndDispatchQueues() { 150 return this.concurrentStoreAndDispatchQueues; 151 } 152 153 /** 154 * @param concurrentStoreAndDispatch 155 * the concurrentStoreAndDispatch to set 156 */ 157 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 158 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; 159 } 160 161 /** 162 * @return the concurrentStoreAndDispatch 163 */ 164 public boolean isConcurrentStoreAndDispatchTopics() { 165 return this.concurrentStoreAndDispatchTopics; 166 } 167 168 /** 169 * @param concurrentStoreAndDispatch 170 * the concurrentStoreAndDispatch to set 171 */ 172 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 173 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; 174 } 175 176 public boolean isConcurrentStoreAndDispatchTransactions() { 177 return this.concurrentStoreAndDispatchTransactions; 178 } 179 180 /** 181 * @return the maxAsyncJobs 182 */ 183 public int getMaxAsyncJobs() { 184 return this.maxAsyncJobs; 185 } 186 187 /** 188 * @param maxAsyncJobs 189 * the maxAsyncJobs to set 190 */ 191 public void setMaxAsyncJobs(int maxAsyncJobs) { 192 this.maxAsyncJobs = maxAsyncJobs; 193 } 194 195 @Override 196 public void doStart() throws Exception { 197 if (brokerService != null) { 198 metadata.openwireVersion = brokerService.getStoreOpenWireVersion(); 199 wireFormat.setVersion(metadata.openwireVersion); 200 201 if (LOG.isDebugEnabled()) { 202 LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion); 203 } 204 205 } 206 super.doStart(); 207 208 if (brokerService != null) { 209 // In case the recovered store used a different OpenWire version log a warning 210 // to assist in determining why journal reads fail. 211 if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) { 212 LOG.warn("Recovered Store uses a different OpenWire version[{}] " + 213 "than the version configured[{}].", 214 metadata.openwireVersion, brokerService.getStoreOpenWireVersion()); 215 } 216 } 217 218 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); 219 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); 220 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 221 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 222 this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 223 asyncQueueJobQueue, new ThreadFactory() { 224 @Override 225 public Thread newThread(Runnable runnable) { 226 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); 227 thread.setDaemon(true); 228 return thread; 229 } 230 }); 231 this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 232 asyncTopicJobQueue, new ThreadFactory() { 233 @Override 234 public Thread newThread(Runnable runnable) { 235 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); 236 thread.setDaemon(true); 237 return thread; 238 } 239 }); 240 } 241 242 @Override 243 public void doStop(ServiceStopper stopper) throws Exception { 244 // drain down async jobs 245 LOG.info("Stopping async queue tasks"); 246 if (this.globalQueueSemaphore != null) { 247 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 248 } 249 synchronized (this.asyncQueueMaps) { 250 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) { 251 synchronized (m) { 252 for (StoreTask task : m.values()) { 253 task.cancel(); 254 } 255 } 256 } 257 this.asyncQueueMaps.clear(); 258 } 259 LOG.info("Stopping async topic tasks"); 260 if (this.globalTopicSemaphore != null) { 261 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 262 } 263 synchronized (this.asyncTopicMaps) { 264 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) { 265 synchronized (m) { 266 for (StoreTask task : m.values()) { 267 task.cancel(); 268 } 269 } 270 } 271 this.asyncTopicMaps.clear(); 272 } 273 if (this.globalQueueSemaphore != null) { 274 this.globalQueueSemaphore.drainPermits(); 275 } 276 if (this.globalTopicSemaphore != null) { 277 this.globalTopicSemaphore.drainPermits(); 278 } 279 if (this.queueExecutor != null) { 280 ThreadPoolUtils.shutdownNow(queueExecutor); 281 queueExecutor = null; 282 } 283 if (this.topicExecutor != null) { 284 ThreadPoolUtils.shutdownNow(topicExecutor); 285 topicExecutor = null; 286 } 287 LOG.info("Stopped KahaDB"); 288 super.doStop(stopper); 289 } 290 291 private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { 292 return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { 293 @Override 294 public Location execute(Transaction tx) throws IOException { 295 StoredDestination sd = getStoredDestination(destination, tx); 296 Long sequence = sd.messageIdIndex.get(tx, key); 297 if (sequence == null) { 298 return null; 299 } 300 return sd.orderIndex.get(tx, sequence).location; 301 } 302 }); 303 } 304 305 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { 306 StoreQueueTask task = null; 307 synchronized (store.asyncTaskMap) { 308 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 309 } 310 return task; 311 } 312 313 // with asyncTaskMap locked 314 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { 315 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 316 this.queueExecutor.execute(task); 317 } 318 319 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { 320 StoreTopicTask task = null; 321 synchronized (store.asyncTaskMap) { 322 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 323 } 324 return task; 325 } 326 327 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { 328 synchronized (store.asyncTaskMap) { 329 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 330 } 331 this.topicExecutor.execute(task); 332 } 333 334 @Override 335 public TransactionStore createTransactionStore() throws IOException { 336 return this.transactionStore; 337 } 338 339 public boolean getForceRecoverIndex() { 340 return this.forceRecoverIndex; 341 } 342 343 public void setForceRecoverIndex(boolean forceRecoverIndex) { 344 this.forceRecoverIndex = forceRecoverIndex; 345 } 346 347 public class KahaDBMessageStore extends AbstractMessageStore { 348 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>(); 349 protected KahaDestination dest; 350 private final int maxAsyncJobs; 351 private final Semaphore localDestinationSemaphore; 352 353 double doneTasks, canceledTasks = 0; 354 355 public KahaDBMessageStore(ActiveMQDestination destination) { 356 super(destination); 357 this.dest = convert(destination); 358 this.maxAsyncJobs = getMaxAsyncJobs(); 359 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); 360 } 361 362 @Override 363 public ActiveMQDestination getDestination() { 364 return destination; 365 } 366 367 @Override 368 public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) 369 throws IOException { 370 if (isConcurrentStoreAndDispatchQueues()) { 371 message.beforeMarshall(wireFormat); 372 StoreQueueTask result = new StoreQueueTask(this, context, message); 373 ListenableFuture<Object> future = result.getFuture(); 374 message.getMessageId().setFutureOrSequenceLong(future); 375 message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch 376 result.aquireLocks(); 377 synchronized (asyncTaskMap) { 378 addQueueTask(this, result); 379 if (indexListener != null) { 380 indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); 381 } 382 } 383 return future; 384 } else { 385 return super.asyncAddQueueMessage(context, message); 386 } 387 } 388 389 @Override 390 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 391 if (isConcurrentStoreAndDispatchQueues()) { 392 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination()); 393 StoreQueueTask task = null; 394 synchronized (asyncTaskMap) { 395 task = (StoreQueueTask) asyncTaskMap.get(key); 396 } 397 if (task != null) { 398 if (ack.isInTransaction() || !task.cancel()) { 399 try { 400 task.future.get(); 401 } catch (InterruptedException e) { 402 throw new InterruptedIOException(e.toString()); 403 } catch (Exception ignored) { 404 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored); 405 } 406 removeMessage(context, ack); 407 } else { 408 indexLock.writeLock().lock(); 409 try { 410 metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId()); 411 } finally { 412 indexLock.writeLock().unlock(); 413 } 414 synchronized (asyncTaskMap) { 415 asyncTaskMap.remove(key); 416 } 417 } 418 } else { 419 removeMessage(context, ack); 420 } 421 } else { 422 removeMessage(context, ack); 423 } 424 } 425 426 @Override 427 public void addMessage(final ConnectionContext context, final Message message) throws IOException { 428 final KahaAddMessageCommand command = new KahaAddMessageCommand(); 429 command.setDestination(dest); 430 command.setMessageId(message.getMessageId().toProducerKey()); 431 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId()))); 432 command.setPriority(message.getPriority()); 433 command.setPrioritySupported(isPrioritizedMessages()); 434 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 435 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 436 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() { 437 // sync add? (for async, future present from getFutureOrSequenceLong) 438 Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); 439 440 @Override 441 public void sequenceAssignedWithIndexLocked(final long sequence) { 442 message.getMessageId().setFutureOrSequenceLong(sequence); 443 if (indexListener != null) { 444 if (possibleFuture == null) { 445 trackPendingAdd(dest, sequence); 446 indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { 447 @Override 448 public void run() { 449 trackPendingAddComplete(dest, sequence); 450 } 451 })); 452 } 453 } 454 } 455 }, null); 456 } 457 458 @Override 459 public void updateMessage(Message message) throws IOException { 460 if (LOG.isTraceEnabled()) { 461 LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); 462 } 463 KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand(); 464 KahaAddMessageCommand command = new KahaAddMessageCommand(); 465 command.setDestination(dest); 466 command.setMessageId(message.getMessageId().toProducerKey()); 467 command.setPriority(message.getPriority()); 468 command.setPrioritySupported(prioritizedMessages); 469 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 470 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 471 updateMessageCommand.setMessage(command); 472 store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null); 473 } 474 475 @Override 476 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 477 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 478 command.setDestination(dest); 479 command.setMessageId(ack.getLastMessageId().toProducerKey()); 480 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId()))); 481 482 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 483 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 484 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); 485 } 486 487 @Override 488 public void removeAllMessages(ConnectionContext context) throws IOException { 489 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 490 command.setDestination(dest); 491 store(command, true, null, null); 492 } 493 494 @Override 495 public Message getMessage(MessageId identity) throws IOException { 496 final String key = identity.toProducerKey(); 497 498 // Hopefully one day the page file supports concurrent read 499 // operations... but for now we must 500 // externally synchronize... 501 Location location; 502 indexLock.writeLock().lock(); 503 try { 504 location = findMessageLocation(key, dest); 505 } finally { 506 indexLock.writeLock().unlock(); 507 } 508 if (location == null) { 509 return null; 510 } 511 512 return loadMessage(location); 513 } 514 515 @Override 516 public int getMessageCount() throws IOException { 517 try { 518 lockAsyncJobQueue(); 519 indexLock.writeLock().lock(); 520 try { 521 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 522 @Override 523 public Integer execute(Transaction tx) throws IOException { 524 // Iterate through all index entries to get a count 525 // of messages in the destination. 526 StoredDestination sd = getStoredDestination(dest, tx); 527 int rc = 0; 528 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { 529 iterator.next(); 530 rc++; 531 } 532 return rc; 533 } 534 }); 535 } finally { 536 indexLock.writeLock().unlock(); 537 } 538 } finally { 539 unlockAsyncJobQueue(); 540 } 541 } 542 543 @Override 544 public boolean isEmpty() throws IOException { 545 indexLock.writeLock().lock(); 546 try { 547 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() { 548 @Override 549 public Boolean execute(Transaction tx) throws IOException { 550 // Iterate through all index entries to get a count of 551 // messages in the destination. 552 StoredDestination sd = getStoredDestination(dest, tx); 553 return sd.locationIndex.isEmpty(tx); 554 } 555 }); 556 } finally { 557 indexLock.writeLock().unlock(); 558 } 559 } 560 561 @Override 562 public void recover(final MessageRecoveryListener listener) throws Exception { 563 // recovery may involve expiry which will modify 564 indexLock.writeLock().lock(); 565 try { 566 pageFile.tx().execute(new Transaction.Closure<Exception>() { 567 @Override 568 public void execute(Transaction tx) throws Exception { 569 StoredDestination sd = getStoredDestination(dest, tx); 570 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 571 sd.orderIndex.resetCursorPosition(); 572 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator 573 .hasNext(); ) { 574 Entry<Long, MessageKeys> entry = iterator.next(); 575 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 576 continue; 577 } 578 Message msg = loadMessage(entry.getValue().location); 579 listener.recoverMessage(msg); 580 } 581 } 582 }); 583 } finally { 584 indexLock.writeLock().unlock(); 585 } 586 } 587 588 @Override 589 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 590 indexLock.writeLock().lock(); 591 try { 592 pageFile.tx().execute(new Transaction.Closure<Exception>() { 593 @Override 594 public void execute(Transaction tx) throws Exception { 595 StoredDestination sd = getStoredDestination(dest, tx); 596 Entry<Long, MessageKeys> entry = null; 597 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 598 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { 599 entry = iterator.next(); 600 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 601 continue; 602 } 603 Message msg = loadMessage(entry.getValue().location); 604 msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); 605 listener.recoverMessage(msg); 606 counter++; 607 if (counter >= maxReturned) { 608 break; 609 } 610 } 611 sd.orderIndex.stoppedIterating(); 612 } 613 }); 614 } finally { 615 indexLock.writeLock().unlock(); 616 } 617 } 618 619 protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { 620 int counter = 0; 621 String id; 622 for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) { 623 id = iterator.next(); 624 iterator.remove(); 625 Long sequence = sd.messageIdIndex.get(tx, id); 626 if (sequence != null) { 627 if (sd.orderIndex.alreadyDispatched(sequence)) { 628 listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location)); 629 counter++; 630 if (counter >= maxReturned) { 631 break; 632 } 633 } else { 634 LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor); 635 } 636 } else { 637 LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd); 638 } 639 } 640 return counter; 641 } 642 643 644 @Override 645 public void resetBatching() { 646 if (pageFile.isLoaded()) { 647 indexLock.writeLock().lock(); 648 try { 649 pageFile.tx().execute(new Transaction.Closure<Exception>() { 650 @Override 651 public void execute(Transaction tx) throws Exception { 652 StoredDestination sd = getExistingStoredDestination(dest, tx); 653 if (sd != null) { 654 sd.orderIndex.resetCursorPosition();} 655 } 656 }); 657 } catch (Exception e) { 658 LOG.error("Failed to reset batching",e); 659 } finally { 660 indexLock.writeLock().unlock(); 661 } 662 } 663 } 664 665 @Override 666 public void setBatch(final MessageId identity) throws IOException { 667 indexLock.writeLock().lock(); 668 try { 669 pageFile.tx().execute(new Transaction.Closure<IOException>() { 670 @Override 671 public void execute(Transaction tx) throws IOException { 672 StoredDestination sd = getStoredDestination(dest, tx); 673 Long location = (Long) identity.getFutureOrSequenceLong(); 674 Long pending = sd.orderIndex.minPendingAdd(); 675 if (pending != null) { 676 location = Math.min(location, pending-1); 677 } 678 sd.orderIndex.setBatch(tx, location); 679 } 680 }); 681 } finally { 682 indexLock.writeLock().unlock(); 683 } 684 } 685 686 @Override 687 public void setMemoryUsage(MemoryUsage memoryUsage) { 688 } 689 @Override 690 public void start() throws Exception { 691 super.start(); 692 } 693 @Override 694 public void stop() throws Exception { 695 super.stop(); 696 } 697 698 protected void lockAsyncJobQueue() { 699 try { 700 if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) { 701 throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore); 702 } 703 } catch (Exception e) { 704 LOG.error("Failed to lock async jobs for " + this.destination, e); 705 } 706 } 707 708 protected void unlockAsyncJobQueue() { 709 this.localDestinationSemaphore.release(this.maxAsyncJobs); 710 } 711 712 protected void acquireLocalAsyncLock() { 713 try { 714 this.localDestinationSemaphore.acquire(); 715 } catch (InterruptedException e) { 716 LOG.error("Failed to aquire async lock for " + this.destination, e); 717 } 718 } 719 720 protected void releaseLocalAsyncLock() { 721 this.localDestinationSemaphore.release(); 722 } 723 724 @Override 725 public String toString(){ 726 return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest)); 727 } 728 } 729 730 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 731 private final AtomicInteger subscriptionCount = new AtomicInteger(); 732 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { 733 super(destination); 734 this.subscriptionCount.set(getAllSubscriptions().length); 735 if (isConcurrentStoreAndDispatchTopics()) { 736 asyncTopicMaps.add(asyncTaskMap); 737 } 738 } 739 740 @Override 741 public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) 742 throws IOException { 743 if (isConcurrentStoreAndDispatchTopics()) { 744 message.beforeMarshall(wireFormat); 745 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); 746 result.aquireLocks(); 747 addTopicTask(this, result); 748 return result.getFuture(); 749 } else { 750 return super.asyncAddTopicMessage(context, message); 751 } 752 } 753 754 @Override 755 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 756 MessageId messageId, MessageAck ack) throws IOException { 757 String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); 758 if (isConcurrentStoreAndDispatchTopics()) { 759 AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); 760 StoreTopicTask task = null; 761 synchronized (asyncTaskMap) { 762 task = (StoreTopicTask) asyncTaskMap.get(key); 763 } 764 if (task != null) { 765 if (task.addSubscriptionKey(subscriptionKey)) { 766 removeTopicTask(this, messageId); 767 if (task.cancel()) { 768 synchronized (asyncTaskMap) { 769 asyncTaskMap.remove(key); 770 } 771 } 772 } 773 } else { 774 doAcknowledge(context, subscriptionKey, messageId, ack); 775 } 776 } else { 777 doAcknowledge(context, subscriptionKey, messageId, ack); 778 } 779 } 780 781 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) 782 throws IOException { 783 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 784 command.setDestination(dest); 785 command.setSubscriptionKey(subscriptionKey); 786 command.setMessageId(messageId.toProducerKey()); 787 command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null); 788 if (ack != null && ack.isUnmatchedAck()) { 789 command.setAck(UNMATCHED); 790 } else { 791 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 792 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 793 } 794 store(command, false, null, null); 795 } 796 797 @Override 798 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 799 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo 800 .getSubscriptionName()); 801 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 802 command.setDestination(dest); 803 command.setSubscriptionKey(subscriptionKey.toString()); 804 command.setRetroactive(retroactive); 805 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 806 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 807 store(command, isEnableJournalDiskSyncs() && true, null, null); 808 this.subscriptionCount.incrementAndGet(); 809 } 810 811 @Override 812 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 813 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 814 command.setDestination(dest); 815 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString()); 816 store(command, isEnableJournalDiskSyncs() && true, null, null); 817 this.subscriptionCount.decrementAndGet(); 818 } 819 820 @Override 821 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 822 823 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 824 indexLock.writeLock().lock(); 825 try { 826 pageFile.tx().execute(new Transaction.Closure<IOException>() { 827 @Override 828 public void execute(Transaction tx) throws IOException { 829 StoredDestination sd = getStoredDestination(dest, tx); 830 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator 831 .hasNext();) { 832 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 833 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry 834 .getValue().getSubscriptionInfo().newInput())); 835 subscriptions.add(info); 836 837 } 838 } 839 }); 840 } finally { 841 indexLock.writeLock().unlock(); 842 } 843 844 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; 845 subscriptions.toArray(rc); 846 return rc; 847 } 848 849 @Override 850 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 851 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 852 indexLock.writeLock().lock(); 853 try { 854 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { 855 @Override 856 public SubscriptionInfo execute(Transaction tx) throws IOException { 857 StoredDestination sd = getStoredDestination(dest, tx); 858 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 859 if (command == null) { 860 return null; 861 } 862 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command 863 .getSubscriptionInfo().newInput())); 864 } 865 }); 866 } finally { 867 indexLock.writeLock().unlock(); 868 } 869 } 870 871 @Override 872 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 873 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 874 indexLock.writeLock().lock(); 875 try { 876 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 877 @Override 878 public Integer execute(Transaction tx) throws IOException { 879 StoredDestination sd = getStoredDestination(dest, tx); 880 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 881 if (cursorPos == null) { 882 // The subscription might not exist. 883 return 0; 884 } 885 886 return (int) getStoredMessageCount(tx, sd, subscriptionKey); 887 } 888 }); 889 } finally { 890 indexLock.writeLock().unlock(); 891 } 892 } 893 894 @Override 895 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) 896 throws Exception { 897 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 898 @SuppressWarnings("unused") 899 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 900 indexLock.writeLock().lock(); 901 try { 902 pageFile.tx().execute(new Transaction.Closure<Exception>() { 903 @Override 904 public void execute(Transaction tx) throws Exception { 905 StoredDestination sd = getStoredDestination(dest, tx); 906 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 907 sd.orderIndex.setBatch(tx, cursorPos); 908 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 909 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator 910 .hasNext();) { 911 Entry<Long, MessageKeys> entry = iterator.next(); 912 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 913 continue; 914 } 915 listener.recoverMessage(loadMessage(entry.getValue().location)); 916 } 917 sd.orderIndex.resetCursorPosition(); 918 } 919 }); 920 } finally { 921 indexLock.writeLock().unlock(); 922 } 923 } 924 925 @Override 926 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, 927 final MessageRecoveryListener listener) throws Exception { 928 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 929 @SuppressWarnings("unused") 930 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 931 indexLock.writeLock().lock(); 932 try { 933 pageFile.tx().execute(new Transaction.Closure<Exception>() { 934 @Override 935 public void execute(Transaction tx) throws Exception { 936 StoredDestination sd = getStoredDestination(dest, tx); 937 sd.orderIndex.resetCursorPosition(); 938 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); 939 if (moc == null) { 940 LastAck pos = getLastAck(tx, sd, subscriptionKey); 941 if (pos == null) { 942 // sub deleted 943 return; 944 } 945 sd.orderIndex.setBatch(tx, pos); 946 moc = sd.orderIndex.cursor; 947 } else { 948 sd.orderIndex.cursor.sync(moc); 949 } 950 951 Entry<Long, MessageKeys> entry = null; 952 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 953 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator 954 .hasNext();) { 955 entry = iterator.next(); 956 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 957 continue; 958 } 959 if (listener.recoverMessage(loadMessage(entry.getValue().location))) { 960 counter++; 961 } 962 if (counter >= maxReturned || listener.hasSpace() == false) { 963 break; 964 } 965 } 966 sd.orderIndex.stoppedIterating(); 967 if (entry != null) { 968 MessageOrderCursor copy = sd.orderIndex.cursor.copy(); 969 sd.subscriptionCursors.put(subscriptionKey, copy); 970 } 971 } 972 }); 973 } finally { 974 indexLock.writeLock().unlock(); 975 } 976 } 977 978 @Override 979 public void resetBatching(String clientId, String subscriptionName) { 980 try { 981 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 982 indexLock.writeLock().lock(); 983 try { 984 pageFile.tx().execute(new Transaction.Closure<IOException>() { 985 @Override 986 public void execute(Transaction tx) throws IOException { 987 StoredDestination sd = getStoredDestination(dest, tx); 988 sd.subscriptionCursors.remove(subscriptionKey); 989 } 990 }); 991 }finally { 992 indexLock.writeLock().unlock(); 993 } 994 } catch (IOException e) { 995 throw new RuntimeException(e); 996 } 997 } 998 } 999 1000 String subscriptionKey(String clientId, String subscriptionName) { 1001 return clientId + ":" + subscriptionName; 1002 } 1003 1004 @Override 1005 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 1006 return this.transactionStore.proxy(new KahaDBMessageStore(destination)); 1007 } 1008 1009 @Override 1010 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 1011 return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); 1012 } 1013 1014 /** 1015 * Cleanup method to remove any state associated with the given destination. 1016 * This method does not stop the message store (it might not be cached). 1017 * 1018 * @param destination 1019 * Destination to forget 1020 */ 1021 @Override 1022 public void removeQueueMessageStore(ActiveMQQueue destination) { 1023 } 1024 1025 /** 1026 * Cleanup method to remove any state associated with the given destination 1027 * This method does not stop the message store (it might not be cached). 1028 * 1029 * @param destination 1030 * Destination to forget 1031 */ 1032 @Override 1033 public void removeTopicMessageStore(ActiveMQTopic destination) { 1034 } 1035 1036 @Override 1037 public void deleteAllMessages() throws IOException { 1038 deleteAllMessages = true; 1039 } 1040 1041 @Override 1042 public Set<ActiveMQDestination> getDestinations() { 1043 try { 1044 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 1045 indexLock.writeLock().lock(); 1046 try { 1047 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1048 @Override 1049 public void execute(Transaction tx) throws IOException { 1050 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator 1051 .hasNext();) { 1052 Entry<String, StoredDestination> entry = iterator.next(); 1053 //Removing isEmpty topic check - see AMQ-5875 1054 rc.add(convert(entry.getKey())); 1055 } 1056 } 1057 }); 1058 }finally { 1059 indexLock.writeLock().unlock(); 1060 } 1061 return rc; 1062 } catch (IOException e) { 1063 throw new RuntimeException(e); 1064 } 1065 } 1066 1067 @Override 1068 public long getLastMessageBrokerSequenceId() throws IOException { 1069 return 0; 1070 } 1071 1072 @Override 1073 public long getLastProducerSequenceId(ProducerId id) { 1074 indexLock.writeLock().lock(); 1075 try { 1076 return metadata.producerSequenceIdTracker.getLastSeqId(id); 1077 } finally { 1078 indexLock.writeLock().unlock(); 1079 } 1080 } 1081 1082 @Override 1083 public long size() { 1084 try { 1085 return journalSize.get() + getPageFile().getDiskSize(); 1086 } catch (IOException e) { 1087 throw new RuntimeException(e); 1088 } 1089 } 1090 1091 @Override 1092 public void beginTransaction(ConnectionContext context) throws IOException { 1093 throw new IOException("Not yet implemented."); 1094 } 1095 @Override 1096 public void commitTransaction(ConnectionContext context) throws IOException { 1097 throw new IOException("Not yet implemented."); 1098 } 1099 @Override 1100 public void rollbackTransaction(ConnectionContext context) throws IOException { 1101 throw new IOException("Not yet implemented."); 1102 } 1103 1104 @Override 1105 public void checkpoint(boolean sync) throws IOException { 1106 super.checkpointCleanup(sync); 1107 } 1108 1109 // ///////////////////////////////////////////////////////////////// 1110 // Internal helper methods. 1111 // ///////////////////////////////////////////////////////////////// 1112 1113 /** 1114 * @param location 1115 * @return 1116 * @throws IOException 1117 */ 1118 Message loadMessage(Location location) throws IOException { 1119 try { 1120 JournalCommand<?> command = load(location); 1121 KahaAddMessageCommand addMessage = null; 1122 switch (command.type()) { 1123 case KAHA_UPDATE_MESSAGE_COMMAND: 1124 addMessage = ((KahaUpdateMessageCommand) command).getMessage(); 1125 break; 1126 default: 1127 addMessage = (KahaAddMessageCommand) command; 1128 } 1129 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); 1130 return msg; 1131 } catch (IOException ioe) { 1132 LOG.error("Failed to load message at: {}", location , ioe); 1133 brokerService.handleIOException(ioe); 1134 throw ioe; 1135 } 1136 } 1137 1138 // ///////////////////////////////////////////////////////////////// 1139 // Internal conversion methods. 1140 // ///////////////////////////////////////////////////////////////// 1141 1142 KahaLocation convert(Location location) { 1143 KahaLocation rc = new KahaLocation(); 1144 rc.setLogId(location.getDataFileId()); 1145 rc.setOffset(location.getOffset()); 1146 return rc; 1147 } 1148 1149 KahaDestination convert(ActiveMQDestination dest) { 1150 KahaDestination rc = new KahaDestination(); 1151 rc.setName(dest.getPhysicalName()); 1152 switch (dest.getDestinationType()) { 1153 case ActiveMQDestination.QUEUE_TYPE: 1154 rc.setType(DestinationType.QUEUE); 1155 return rc; 1156 case ActiveMQDestination.TOPIC_TYPE: 1157 rc.setType(DestinationType.TOPIC); 1158 return rc; 1159 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1160 rc.setType(DestinationType.TEMP_QUEUE); 1161 return rc; 1162 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1163 rc.setType(DestinationType.TEMP_TOPIC); 1164 return rc; 1165 default: 1166 return null; 1167 } 1168 } 1169 1170 ActiveMQDestination convert(String dest) { 1171 int p = dest.indexOf(":"); 1172 if (p < 0) { 1173 throw new IllegalArgumentException("Not in the valid destination format"); 1174 } 1175 int type = Integer.parseInt(dest.substring(0, p)); 1176 String name = dest.substring(p + 1); 1177 return convert(type, name); 1178 } 1179 1180 private ActiveMQDestination convert(KahaDestination commandDestination) { 1181 return convert(commandDestination.getType().getNumber(), commandDestination.getName()); 1182 } 1183 1184 private ActiveMQDestination convert(int type, String name) { 1185 switch (KahaDestination.DestinationType.valueOf(type)) { 1186 case QUEUE: 1187 return new ActiveMQQueue(name); 1188 case TOPIC: 1189 return new ActiveMQTopic(name); 1190 case TEMP_QUEUE: 1191 return new ActiveMQTempQueue(name); 1192 case TEMP_TOPIC: 1193 return new ActiveMQTempTopic(name); 1194 default: 1195 throw new IllegalArgumentException("Not in the valid destination format"); 1196 } 1197 } 1198 1199 public TransactionIdTransformer getTransactionIdTransformer() { 1200 return transactionIdTransformer; 1201 } 1202 1203 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 1204 this.transactionIdTransformer = transactionIdTransformer; 1205 } 1206 1207 static class AsyncJobKey { 1208 MessageId id; 1209 ActiveMQDestination destination; 1210 1211 AsyncJobKey(MessageId id, ActiveMQDestination destination) { 1212 this.id = id; 1213 this.destination = destination; 1214 } 1215 1216 @Override 1217 public boolean equals(Object obj) { 1218 if (obj == this) { 1219 return true; 1220 } 1221 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id) 1222 && destination.equals(((AsyncJobKey) obj).destination); 1223 } 1224 1225 @Override 1226 public int hashCode() { 1227 return id.hashCode() + destination.hashCode(); 1228 } 1229 1230 @Override 1231 public String toString() { 1232 return destination.getPhysicalName() + "-" + id; 1233 } 1234 } 1235 1236 public interface StoreTask { 1237 public boolean cancel(); 1238 1239 public void aquireLocks(); 1240 1241 public void releaseLocks(); 1242 } 1243 1244 class StoreQueueTask implements Runnable, StoreTask { 1245 protected final Message message; 1246 protected final ConnectionContext context; 1247 protected final KahaDBMessageStore store; 1248 protected final InnerFutureTask future; 1249 protected final AtomicBoolean done = new AtomicBoolean(); 1250 protected final AtomicBoolean locked = new AtomicBoolean(); 1251 1252 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) { 1253 this.store = store; 1254 this.context = context; 1255 this.message = message; 1256 this.future = new InnerFutureTask(this); 1257 } 1258 1259 public ListenableFuture<Object> getFuture() { 1260 return this.future; 1261 } 1262 1263 @Override 1264 public boolean cancel() { 1265 if (this.done.compareAndSet(false, true)) { 1266 return this.future.cancel(false); 1267 } 1268 return false; 1269 } 1270 1271 @Override 1272 public void aquireLocks() { 1273 if (this.locked.compareAndSet(false, true)) { 1274 try { 1275 globalQueueSemaphore.acquire(); 1276 store.acquireLocalAsyncLock(); 1277 message.incrementReferenceCount(); 1278 } catch (InterruptedException e) { 1279 LOG.warn("Failed to aquire lock", e); 1280 } 1281 } 1282 1283 } 1284 1285 @Override 1286 public void releaseLocks() { 1287 if (this.locked.compareAndSet(true, false)) { 1288 store.releaseLocalAsyncLock(); 1289 globalQueueSemaphore.release(); 1290 message.decrementReferenceCount(); 1291 } 1292 } 1293 1294 @Override 1295 public void run() { 1296 this.store.doneTasks++; 1297 try { 1298 if (this.done.compareAndSet(false, true)) { 1299 this.store.addMessage(context, message); 1300 removeQueueTask(this.store, this.message.getMessageId()); 1301 this.future.complete(); 1302 } else if (cancelledTaskModMetric > 0 && (++this.store.canceledTasks) % cancelledTaskModMetric == 0) { 1303 System.err.println(this.store.dest.getName() + " cancelled: " 1304 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1305 this.store.canceledTasks = this.store.doneTasks = 0; 1306 } 1307 } catch (Exception e) { 1308 this.future.setException(e); 1309 } 1310 } 1311 1312 protected Message getMessage() { 1313 return this.message; 1314 } 1315 1316 private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> { 1317 1318 private final AtomicReference<Runnable> listenerRef = new AtomicReference<>(); 1319 1320 public InnerFutureTask(Runnable runnable) { 1321 super(runnable, null); 1322 } 1323 1324 public void setException(final Exception e) { 1325 super.setException(e); 1326 } 1327 1328 public void complete() { 1329 super.set(null); 1330 } 1331 1332 @Override 1333 public void done() { 1334 fireListener(); 1335 } 1336 1337 @Override 1338 public void addListener(Runnable listener) { 1339 this.listenerRef.set(listener); 1340 if (isDone()) { 1341 fireListener(); 1342 } 1343 } 1344 1345 private void fireListener() { 1346 Runnable listener = listenerRef.getAndSet(null); 1347 if (listener != null) { 1348 try { 1349 listener.run(); 1350 } catch (Exception ignored) { 1351 LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored); 1352 } 1353 } 1354 } 1355 } 1356 } 1357 1358 class StoreTopicTask extends StoreQueueTask { 1359 private final int subscriptionCount; 1360 private final List<String> subscriptionKeys = new ArrayList<String>(1); 1361 private final KahaDBTopicMessageStore topicStore; 1362 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, 1363 int subscriptionCount) { 1364 super(store, context, message); 1365 this.topicStore = store; 1366 this.subscriptionCount = subscriptionCount; 1367 1368 } 1369 1370 @Override 1371 public void aquireLocks() { 1372 if (this.locked.compareAndSet(false, true)) { 1373 try { 1374 globalTopicSemaphore.acquire(); 1375 store.acquireLocalAsyncLock(); 1376 message.incrementReferenceCount(); 1377 } catch (InterruptedException e) { 1378 LOG.warn("Failed to aquire lock", e); 1379 } 1380 } 1381 } 1382 1383 @Override 1384 public void releaseLocks() { 1385 if (this.locked.compareAndSet(true, false)) { 1386 message.decrementReferenceCount(); 1387 store.releaseLocalAsyncLock(); 1388 globalTopicSemaphore.release(); 1389 } 1390 } 1391 1392 /** 1393 * add a key 1394 * 1395 * @param key 1396 * @return true if all acknowledgements received 1397 */ 1398 public boolean addSubscriptionKey(String key) { 1399 synchronized (this.subscriptionKeys) { 1400 this.subscriptionKeys.add(key); 1401 } 1402 return this.subscriptionKeys.size() >= this.subscriptionCount; 1403 } 1404 1405 @Override 1406 public void run() { 1407 this.store.doneTasks++; 1408 try { 1409 if (this.done.compareAndSet(false, true)) { 1410 this.topicStore.addMessage(context, message); 1411 // apply any acks we have 1412 synchronized (this.subscriptionKeys) { 1413 for (String key : this.subscriptionKeys) { 1414 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); 1415 1416 } 1417 } 1418 removeTopicTask(this.topicStore, this.message.getMessageId()); 1419 this.future.complete(); 1420 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1421 System.err.println(this.store.dest.getName() + " cancelled: " 1422 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1423 this.store.canceledTasks = this.store.doneTasks = 0; 1424 } 1425 } catch (Exception e) { 1426 this.future.setException(e); 1427 } 1428 } 1429 } 1430 1431 public class StoreTaskExecutor extends ThreadPoolExecutor { 1432 1433 public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) { 1434 super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory); 1435 } 1436 1437 @Override 1438 protected void afterExecute(Runnable runnable, Throwable throwable) { 1439 super.afterExecute(runnable, throwable); 1440 1441 if (runnable instanceof StoreTask) { 1442 ((StoreTask)runnable).releaseLocks(); 1443 } 1444 } 1445 } 1446 1447 @Override 1448 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 1449 return new JobSchedulerStoreImpl(); 1450 } 1451}