001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.Set; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.FutureTask; 034import java.util.concurrent.LinkedBlockingQueue; 035import java.util.concurrent.Semaphore; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicInteger; 042import java.util.concurrent.atomic.AtomicReference; 043 044import org.apache.activemq.broker.ConnectionContext; 045import org.apache.activemq.broker.region.BaseDestination; 046import org.apache.activemq.broker.scheduler.JobSchedulerStore; 047import org.apache.activemq.command.ActiveMQDestination; 048import org.apache.activemq.command.ActiveMQQueue; 049import org.apache.activemq.command.ActiveMQTempQueue; 050import org.apache.activemq.command.ActiveMQTempTopic; 051import org.apache.activemq.command.ActiveMQTopic; 052import org.apache.activemq.command.Message; 053import org.apache.activemq.command.MessageAck; 054import org.apache.activemq.command.MessageId; 055import org.apache.activemq.command.ProducerId; 056import org.apache.activemq.command.SubscriptionInfo; 057import org.apache.activemq.command.TransactionId; 058import org.apache.activemq.openwire.OpenWireFormat; 059import org.apache.activemq.protobuf.Buffer; 060import org.apache.activemq.store.AbstractMessageStore; 061import org.apache.activemq.store.IndexListener; 062import org.apache.activemq.store.ListenableFuture; 063import org.apache.activemq.store.MessageRecoveryListener; 064import org.apache.activemq.store.MessageStore; 065import org.apache.activemq.store.PersistenceAdapter; 066import org.apache.activemq.store.ProxyMessageStore; 067import org.apache.activemq.store.TopicMessageStore; 068import org.apache.activemq.store.TransactionIdTransformer; 069import org.apache.activemq.store.TransactionStore; 070import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 071import org.apache.activemq.store.kahadb.data.KahaDestination; 072import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 073import org.apache.activemq.store.kahadb.data.KahaLocation; 074import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 075import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 076import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 077import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 078import org.apache.activemq.store.kahadb.disk.journal.Location; 079import org.apache.activemq.store.kahadb.disk.page.Transaction; 080import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; 081import org.apache.activemq.usage.MemoryUsage; 082import org.apache.activemq.usage.SystemUsage; 083import org.apache.activemq.util.IOExceptionSupport; 084import org.apache.activemq.util.ServiceStopper; 085import org.apache.activemq.util.ThreadPoolUtils; 086import org.apache.activemq.wireformat.WireFormat; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { 091 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); 092 private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH; 093 094 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC"; 095 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty( 096 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10); 097 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS"; 098 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty( 099 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);; 100 101 protected ExecutorService queueExecutor; 102 protected ExecutorService topicExecutor; 103 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 104 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 105 final WireFormat wireFormat = new OpenWireFormat(); 106 private SystemUsage usageManager; 107 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue; 108 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue; 109 Semaphore globalQueueSemaphore; 110 Semaphore globalTopicSemaphore; 111 private boolean concurrentStoreAndDispatchQueues = true; 112 // when true, message order may be compromised when cache is exhausted if store is out 113 // or order w.r.t cache 114 private boolean concurrentStoreAndDispatchTopics = false; 115 private final boolean concurrentStoreAndDispatchTransactions = false; 116 private int maxAsyncJobs = MAX_ASYNC_JOBS; 117 private final KahaDBTransactionStore transactionStore; 118 private TransactionIdTransformer transactionIdTransformer; 119 120 public KahaDBStore() { 121 this.transactionStore = new KahaDBTransactionStore(this); 122 this.transactionIdTransformer = new TransactionIdTransformer() { 123 @Override 124 public TransactionId transform(TransactionId txid) { 125 return txid; 126 } 127 }; 128 } 129 130 @Override 131 public String toString() { 132 return "KahaDB:[" + directory.getAbsolutePath() + "]"; 133 } 134 135 @Override 136 public void setBrokerName(String brokerName) { 137 } 138 139 @Override 140 public void setUsageManager(SystemUsage usageManager) { 141 this.usageManager = usageManager; 142 } 143 144 public SystemUsage getUsageManager() { 145 return this.usageManager; 146 } 147 148 /** 149 * @return the concurrentStoreAndDispatch 150 */ 151 public boolean isConcurrentStoreAndDispatchQueues() { 152 return this.concurrentStoreAndDispatchQueues; 153 } 154 155 /** 156 * @param concurrentStoreAndDispatch 157 * the concurrentStoreAndDispatch to set 158 */ 159 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 160 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; 161 } 162 163 /** 164 * @return the concurrentStoreAndDispatch 165 */ 166 public boolean isConcurrentStoreAndDispatchTopics() { 167 return this.concurrentStoreAndDispatchTopics; 168 } 169 170 /** 171 * @param concurrentStoreAndDispatch 172 * the concurrentStoreAndDispatch to set 173 */ 174 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 175 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; 176 } 177 178 public boolean isConcurrentStoreAndDispatchTransactions() { 179 return this.concurrentStoreAndDispatchTransactions; 180 } 181 182 /** 183 * @return the maxAsyncJobs 184 */ 185 public int getMaxAsyncJobs() { 186 return this.maxAsyncJobs; 187 } 188 189 /** 190 * @param maxAsyncJobs 191 * the maxAsyncJobs to set 192 */ 193 public void setMaxAsyncJobs(int maxAsyncJobs) { 194 this.maxAsyncJobs = maxAsyncJobs; 195 } 196 197 @Override 198 public void doStart() throws Exception { 199 if (brokerService != null) { 200 metadata.openwireVersion = brokerService.getStoreOpenWireVersion(); 201 wireFormat.setVersion(metadata.openwireVersion); 202 203 if (LOG.isDebugEnabled()) { 204 LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion); 205 } 206 207 } 208 super.doStart(); 209 210 if (brokerService != null) { 211 // In case the recovered store used a different OpenWire version log a warning 212 // to assist in determining why journal reads fail. 213 if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) { 214 LOG.warn("Recovered Store uses a different OpenWire version[{}] " + 215 "than the version configured[{}].", 216 metadata.openwireVersion, brokerService.getStoreOpenWireVersion()); 217 } 218 } 219 220 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); 221 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); 222 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 223 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 224 this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 225 asyncQueueJobQueue, new ThreadFactory() { 226 @Override 227 public Thread newThread(Runnable runnable) { 228 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); 229 thread.setDaemon(true); 230 return thread; 231 } 232 }); 233 this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 234 asyncTopicJobQueue, new ThreadFactory() { 235 @Override 236 public Thread newThread(Runnable runnable) { 237 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); 238 thread.setDaemon(true); 239 return thread; 240 } 241 }); 242 } 243 244 @Override 245 public void doStop(ServiceStopper stopper) throws Exception { 246 // drain down async jobs 247 LOG.info("Stopping async queue tasks"); 248 if (this.globalQueueSemaphore != null) { 249 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 250 } 251 synchronized (this.asyncQueueMaps) { 252 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) { 253 synchronized (m) { 254 for (StoreTask task : m.values()) { 255 task.cancel(); 256 } 257 } 258 } 259 this.asyncQueueMaps.clear(); 260 } 261 LOG.info("Stopping async topic tasks"); 262 if (this.globalTopicSemaphore != null) { 263 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 264 } 265 synchronized (this.asyncTopicMaps) { 266 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) { 267 synchronized (m) { 268 for (StoreTask task : m.values()) { 269 task.cancel(); 270 } 271 } 272 } 273 this.asyncTopicMaps.clear(); 274 } 275 if (this.globalQueueSemaphore != null) { 276 this.globalQueueSemaphore.drainPermits(); 277 } 278 if (this.globalTopicSemaphore != null) { 279 this.globalTopicSemaphore.drainPermits(); 280 } 281 if (this.queueExecutor != null) { 282 ThreadPoolUtils.shutdownNow(queueExecutor); 283 queueExecutor = null; 284 } 285 if (this.topicExecutor != null) { 286 ThreadPoolUtils.shutdownNow(topicExecutor); 287 topicExecutor = null; 288 } 289 LOG.info("Stopped KahaDB"); 290 super.doStop(stopper); 291 } 292 293 private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { 294 return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { 295 @Override 296 public Location execute(Transaction tx) throws IOException { 297 StoredDestination sd = getStoredDestination(destination, tx); 298 Long sequence = sd.messageIdIndex.get(tx, key); 299 if (sequence == null) { 300 return null; 301 } 302 return sd.orderIndex.get(tx, sequence).location; 303 } 304 }); 305 } 306 307 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { 308 StoreQueueTask task = null; 309 synchronized (store.asyncTaskMap) { 310 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 311 } 312 return task; 313 } 314 315 // with asyncTaskMap locked 316 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { 317 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 318 this.queueExecutor.execute(task); 319 } 320 321 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { 322 StoreTopicTask task = null; 323 synchronized (store.asyncTaskMap) { 324 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 325 } 326 return task; 327 } 328 329 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { 330 synchronized (store.asyncTaskMap) { 331 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 332 } 333 this.topicExecutor.execute(task); 334 } 335 336 @Override 337 public TransactionStore createTransactionStore() throws IOException { 338 return this.transactionStore; 339 } 340 341 public boolean getForceRecoverIndex() { 342 return this.forceRecoverIndex; 343 } 344 345 public void setForceRecoverIndex(boolean forceRecoverIndex) { 346 this.forceRecoverIndex = forceRecoverIndex; 347 } 348 349 public void forgetRecoveredAcks(ArrayList<MessageAck> preparedAcks, boolean isRollback) throws IOException { 350 if (preparedAcks != null) { 351 Map<ActiveMQDestination, KahaDBMessageStore> stores = new HashMap<>(); 352 for (MessageAck ack : preparedAcks) { 353 stores.put(ack.getDestination(), findMatchingStore(ack.getDestination())); 354 } 355 ArrayList<MessageAck> perStoreAcks = new ArrayList<>(); 356 for (Entry<ActiveMQDestination, KahaDBMessageStore> entry : stores.entrySet()) { 357 for (MessageAck ack : preparedAcks) { 358 if (entry.getKey().equals(ack.getDestination())) { 359 perStoreAcks.add(ack); 360 } 361 } 362 entry.getValue().forgetRecoveredAcks(perStoreAcks, isRollback); 363 perStoreAcks.clear(); 364 } 365 } 366 } 367 368 public void trackRecoveredAcks(ArrayList<MessageAck> preparedAcks) throws IOException { 369 Map<ActiveMQDestination, KahaDBMessageStore> stores = new HashMap<>(); 370 for (MessageAck ack : preparedAcks) { 371 stores.put(ack.getDestination(), findMatchingStore(ack.getDestination())); 372 } 373 ArrayList<MessageAck> perStoreAcks = new ArrayList<>(); 374 for (Entry<ActiveMQDestination, KahaDBMessageStore> entry : stores.entrySet()) { 375 for (MessageAck ack : preparedAcks) { 376 if (entry.getKey().equals(ack.getDestination())) { 377 perStoreAcks.add(ack); 378 } 379 } 380 entry.getValue().trackRecoveredAcks(perStoreAcks); 381 perStoreAcks.clear(); 382 } 383 } 384 385 private KahaDBMessageStore findMatchingStore(ActiveMQDestination activeMQDestination) throws IOException { 386 ProxyMessageStore store = (ProxyMessageStore) storeCache.get(convert(activeMQDestination)); 387 if (store == null) { 388 if (activeMQDestination.isQueue()) { 389 store = (ProxyMessageStore) createQueueMessageStore((ActiveMQQueue) activeMQDestination); 390 } else { 391 store = (ProxyMessageStore) createTopicMessageStore((ActiveMQTopic) activeMQDestination); 392 } 393 } 394 return (KahaDBMessageStore) store.getDelegate(); 395 } 396 397 public class KahaDBMessageStore extends AbstractMessageStore { 398 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>(); 399 protected KahaDestination dest; 400 private final int maxAsyncJobs; 401 private final Semaphore localDestinationSemaphore; 402 protected final Set<String> ackedAndPrepared = new HashSet<>(); 403 protected final Set<String> rolledBackAcks = new HashSet<>(); 404 405 double doneTasks, canceledTasks = 0; 406 407 public KahaDBMessageStore(ActiveMQDestination destination) { 408 super(destination); 409 this.dest = convert(destination); 410 this.maxAsyncJobs = getMaxAsyncJobs(); 411 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); 412 } 413 414 @Override 415 public ActiveMQDestination getDestination() { 416 return destination; 417 } 418 419 420 // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback, 421 // till then they are skipped by the store. 422 // 'at most once' XA guarantee 423 public void trackRecoveredAcks(ArrayList<MessageAck> acks) { 424 indexLock.writeLock().lock(); 425 try { 426 for (MessageAck ack : acks) { 427 ackedAndPrepared.add(ack.getLastMessageId().toProducerKey()); 428 } 429 } finally { 430 indexLock.writeLock().unlock(); 431 } 432 } 433 434 public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException { 435 if (acks != null) { 436 indexLock.writeLock().lock(); 437 try { 438 for (MessageAck ack : acks) { 439 final String id = ack.getLastMessageId().toProducerKey(); 440 ackedAndPrepared.remove(id); 441 if (rollback) { 442 rolledBackAcks.add(id); 443 //incrementAndAddSizeToStoreStat(dest, 0); 444 } 445 } 446 } finally { 447 indexLock.writeLock().unlock(); 448 } 449 } 450 } 451 452 @Override 453 public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) 454 throws IOException { 455 if (isConcurrentStoreAndDispatchQueues()) { 456 message.beforeMarshall(wireFormat); 457 StoreQueueTask result = new StoreQueueTask(this, context, message); 458 ListenableFuture<Object> future = result.getFuture(); 459 message.getMessageId().setFutureOrSequenceLong(future); 460 message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch 461 result.aquireLocks(); 462 synchronized (asyncTaskMap) { 463 addQueueTask(this, result); 464 if (indexListener != null) { 465 indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); 466 } 467 } 468 return future; 469 } else { 470 return super.asyncAddQueueMessage(context, message); 471 } 472 } 473 474 @Override 475 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 476 if (isConcurrentStoreAndDispatchQueues()) { 477 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination()); 478 StoreQueueTask task = null; 479 synchronized (asyncTaskMap) { 480 task = (StoreQueueTask) asyncTaskMap.get(key); 481 } 482 if (task != null) { 483 if (ack.isInTransaction() || !task.cancel()) { 484 try { 485 task.future.get(); 486 } catch (InterruptedException e) { 487 throw new InterruptedIOException(e.toString()); 488 } catch (Exception ignored) { 489 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored); 490 } 491 removeMessage(context, ack); 492 } else { 493 indexLock.writeLock().lock(); 494 try { 495 metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId()); 496 } finally { 497 indexLock.writeLock().unlock(); 498 } 499 synchronized (asyncTaskMap) { 500 asyncTaskMap.remove(key); 501 } 502 } 503 } else { 504 removeMessage(context, ack); 505 } 506 } else { 507 removeMessage(context, ack); 508 } 509 } 510 511 @Override 512 public void addMessage(final ConnectionContext context, final Message message) throws IOException { 513 final KahaAddMessageCommand command = new KahaAddMessageCommand(); 514 command.setDestination(dest); 515 command.setMessageId(message.getMessageId().toProducerKey()); 516 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId()))); 517 command.setPriority(message.getPriority()); 518 command.setPrioritySupported(isPrioritizedMessages()); 519 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 520 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 521 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() { 522 // sync add? (for async, future present from getFutureOrSequenceLong) 523 Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); 524 525 @Override 526 public void sequenceAssignedWithIndexLocked(final long sequence) { 527 message.getMessageId().setFutureOrSequenceLong(sequence); 528 if (indexListener != null) { 529 if (possibleFuture == null) { 530 trackPendingAdd(dest, sequence); 531 indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { 532 @Override 533 public void run() { 534 trackPendingAddComplete(dest, sequence); 535 } 536 })); 537 } 538 } 539 } 540 }, null); 541 } 542 543 @Override 544 public void updateMessage(Message message) throws IOException { 545 if (LOG.isTraceEnabled()) { 546 LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); 547 } 548 KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand(); 549 KahaAddMessageCommand command = new KahaAddMessageCommand(); 550 command.setDestination(dest); 551 command.setMessageId(message.getMessageId().toProducerKey()); 552 command.setPriority(message.getPriority()); 553 command.setPrioritySupported(prioritizedMessages); 554 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 555 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 556 updateMessageCommand.setMessage(command); 557 store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null); 558 } 559 560 @Override 561 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 562 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 563 command.setDestination(dest); 564 command.setMessageId(ack.getLastMessageId().toProducerKey()); 565 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId()))); 566 567 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 568 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 569 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); 570 } 571 572 @Override 573 public void removeAllMessages(ConnectionContext context) throws IOException { 574 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 575 command.setDestination(dest); 576 store(command, true, null, null); 577 } 578 579 @Override 580 public Message getMessage(MessageId identity) throws IOException { 581 final String key = identity.toProducerKey(); 582 583 // Hopefully one day the page file supports concurrent read 584 // operations... but for now we must 585 // externally synchronize... 586 Location location; 587 indexLock.writeLock().lock(); 588 try { 589 location = findMessageLocation(key, dest); 590 } finally { 591 indexLock.writeLock().unlock(); 592 } 593 if (location == null) { 594 return null; 595 } 596 597 return loadMessage(location); 598 } 599 600 @Override 601 public int getMessageCount() throws IOException { 602 try { 603 lockAsyncJobQueue(); 604 indexLock.writeLock().lock(); 605 try { 606 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 607 @Override 608 public Integer execute(Transaction tx) throws IOException { 609 // Iterate through all index entries to get a count 610 // of messages in the destination. 611 StoredDestination sd = getStoredDestination(dest, tx); 612 int rc = 0; 613 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { 614 iterator.next(); 615 rc++; 616 } 617 rc = rc - ackedAndPrepared.size(); 618 return rc; 619 } 620 }); 621 } finally { 622 indexLock.writeLock().unlock(); 623 } 624 } finally { 625 unlockAsyncJobQueue(); 626 } 627 } 628 629 @Override 630 public boolean isEmpty() throws IOException { 631 indexLock.writeLock().lock(); 632 try { 633 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() { 634 @Override 635 public Boolean execute(Transaction tx) throws IOException { 636 // Iterate through all index entries to get a count of 637 // messages in the destination. 638 StoredDestination sd = getStoredDestination(dest, tx); 639 return sd.locationIndex.isEmpty(tx); 640 } 641 }); 642 } finally { 643 indexLock.writeLock().unlock(); 644 } 645 } 646 647 @Override 648 public void recover(final MessageRecoveryListener listener) throws Exception { 649 // recovery may involve expiry which will modify 650 indexLock.writeLock().lock(); 651 try { 652 pageFile.tx().execute(new Transaction.Closure<Exception>() { 653 @Override 654 public void execute(Transaction tx) throws Exception { 655 StoredDestination sd = getStoredDestination(dest, tx); 656 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 657 sd.orderIndex.resetCursorPosition(); 658 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator 659 .hasNext(); ) { 660 Entry<Long, MessageKeys> entry = iterator.next(); 661 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 662 continue; 663 } 664 Message msg = loadMessage(entry.getValue().location); 665 listener.recoverMessage(msg); 666 } 667 } 668 }); 669 } finally { 670 indexLock.writeLock().unlock(); 671 } 672 } 673 674 @Override 675 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 676 indexLock.writeLock().lock(); 677 try { 678 pageFile.tx().execute(new Transaction.Closure<Exception>() { 679 @Override 680 public void execute(Transaction tx) throws Exception { 681 StoredDestination sd = getStoredDestination(dest, tx); 682 Entry<Long, MessageKeys> entry = null; 683 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 684 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { 685 entry = iterator.next(); 686 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 687 continue; 688 } 689 Message msg = loadMessage(entry.getValue().location); 690 msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); 691 listener.recoverMessage(msg); 692 counter++; 693 if (counter >= maxReturned) { 694 break; 695 } 696 } 697 sd.orderIndex.stoppedIterating(); 698 } 699 }); 700 } finally { 701 indexLock.writeLock().unlock(); 702 } 703 } 704 705 protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { 706 int counter = 0; 707 String id; 708 for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) { 709 id = iterator.next(); 710 iterator.remove(); 711 Long sequence = sd.messageIdIndex.get(tx, id); 712 if (sequence != null) { 713 if (sd.orderIndex.alreadyDispatched(sequence)) { 714 listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location)); 715 counter++; 716 if (counter >= maxReturned) { 717 break; 718 } 719 } else { 720 LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor); 721 } 722 } else { 723 LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd); 724 } 725 } 726 return counter; 727 } 728 729 730 @Override 731 public void resetBatching() { 732 if (pageFile.isLoaded()) { 733 indexLock.writeLock().lock(); 734 try { 735 pageFile.tx().execute(new Transaction.Closure<Exception>() { 736 @Override 737 public void execute(Transaction tx) throws Exception { 738 StoredDestination sd = getExistingStoredDestination(dest, tx); 739 if (sd != null) { 740 sd.orderIndex.resetCursorPosition();} 741 } 742 }); 743 } catch (Exception e) { 744 LOG.error("Failed to reset batching",e); 745 } finally { 746 indexLock.writeLock().unlock(); 747 } 748 } 749 } 750 751 @Override 752 public void setBatch(final MessageId identity) throws IOException { 753 indexLock.writeLock().lock(); 754 try { 755 pageFile.tx().execute(new Transaction.Closure<IOException>() { 756 @Override 757 public void execute(Transaction tx) throws IOException { 758 StoredDestination sd = getStoredDestination(dest, tx); 759 Long location = (Long) identity.getFutureOrSequenceLong(); 760 Long pending = sd.orderIndex.minPendingAdd(); 761 if (pending != null) { 762 location = Math.min(location, pending-1); 763 } 764 sd.orderIndex.setBatch(tx, location); 765 } 766 }); 767 } finally { 768 indexLock.writeLock().unlock(); 769 } 770 } 771 772 @Override 773 public void setMemoryUsage(MemoryUsage memoryUsage) { 774 } 775 @Override 776 public void start() throws Exception { 777 super.start(); 778 } 779 @Override 780 public void stop() throws Exception { 781 super.stop(); 782 } 783 784 protected void lockAsyncJobQueue() { 785 try { 786 if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) { 787 throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore); 788 } 789 } catch (Exception e) { 790 LOG.error("Failed to lock async jobs for " + this.destination, e); 791 } 792 } 793 794 protected void unlockAsyncJobQueue() { 795 this.localDestinationSemaphore.release(this.maxAsyncJobs); 796 } 797 798 protected void acquireLocalAsyncLock() { 799 try { 800 this.localDestinationSemaphore.acquire(); 801 } catch (InterruptedException e) { 802 LOG.error("Failed to aquire async lock for " + this.destination, e); 803 } 804 } 805 806 protected void releaseLocalAsyncLock() { 807 this.localDestinationSemaphore.release(); 808 } 809 810 @Override 811 public String toString(){ 812 return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest)); 813 } 814 } 815 816 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 817 private final AtomicInteger subscriptionCount = new AtomicInteger(); 818 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { 819 super(destination); 820 this.subscriptionCount.set(getAllSubscriptions().length); 821 if (isConcurrentStoreAndDispatchTopics()) { 822 asyncTopicMaps.add(asyncTaskMap); 823 } 824 } 825 826 @Override 827 public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) 828 throws IOException { 829 if (isConcurrentStoreAndDispatchTopics()) { 830 message.beforeMarshall(wireFormat); 831 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); 832 result.aquireLocks(); 833 addTopicTask(this, result); 834 return result.getFuture(); 835 } else { 836 return super.asyncAddTopicMessage(context, message); 837 } 838 } 839 840 @Override 841 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 842 MessageId messageId, MessageAck ack) throws IOException { 843 String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); 844 if (isConcurrentStoreAndDispatchTopics()) { 845 AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); 846 StoreTopicTask task = null; 847 synchronized (asyncTaskMap) { 848 task = (StoreTopicTask) asyncTaskMap.get(key); 849 } 850 if (task != null) { 851 if (task.addSubscriptionKey(subscriptionKey)) { 852 removeTopicTask(this, messageId); 853 if (task.cancel()) { 854 synchronized (asyncTaskMap) { 855 asyncTaskMap.remove(key); 856 } 857 } 858 } 859 } else { 860 doAcknowledge(context, subscriptionKey, messageId, ack); 861 } 862 } else { 863 doAcknowledge(context, subscriptionKey, messageId, ack); 864 } 865 } 866 867 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) 868 throws IOException { 869 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 870 command.setDestination(dest); 871 command.setSubscriptionKey(subscriptionKey); 872 command.setMessageId(messageId.toProducerKey()); 873 command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null); 874 if (ack != null && ack.isUnmatchedAck()) { 875 command.setAck(UNMATCHED); 876 } else { 877 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 878 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 879 } 880 store(command, false, null, null); 881 } 882 883 @Override 884 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 885 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo 886 .getSubscriptionName()); 887 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 888 command.setDestination(dest); 889 command.setSubscriptionKey(subscriptionKey.toString()); 890 command.setRetroactive(retroactive); 891 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 892 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 893 store(command, isEnableJournalDiskSyncs() && true, null, null); 894 this.subscriptionCount.incrementAndGet(); 895 } 896 897 @Override 898 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 899 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 900 command.setDestination(dest); 901 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString()); 902 store(command, isEnableJournalDiskSyncs() && true, null, null); 903 this.subscriptionCount.decrementAndGet(); 904 } 905 906 @Override 907 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 908 909 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 910 indexLock.writeLock().lock(); 911 try { 912 pageFile.tx().execute(new Transaction.Closure<IOException>() { 913 @Override 914 public void execute(Transaction tx) throws IOException { 915 StoredDestination sd = getStoredDestination(dest, tx); 916 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator 917 .hasNext();) { 918 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 919 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry 920 .getValue().getSubscriptionInfo().newInput())); 921 subscriptions.add(info); 922 923 } 924 } 925 }); 926 } finally { 927 indexLock.writeLock().unlock(); 928 } 929 930 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; 931 subscriptions.toArray(rc); 932 return rc; 933 } 934 935 @Override 936 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 937 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 938 indexLock.writeLock().lock(); 939 try { 940 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { 941 @Override 942 public SubscriptionInfo execute(Transaction tx) throws IOException { 943 StoredDestination sd = getStoredDestination(dest, tx); 944 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 945 if (command == null) { 946 return null; 947 } 948 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command 949 .getSubscriptionInfo().newInput())); 950 } 951 }); 952 } finally { 953 indexLock.writeLock().unlock(); 954 } 955 } 956 957 @Override 958 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 959 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 960 indexLock.writeLock().lock(); 961 try { 962 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 963 @Override 964 public Integer execute(Transaction tx) throws IOException { 965 StoredDestination sd = getStoredDestination(dest, tx); 966 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 967 if (cursorPos == null) { 968 // The subscription might not exist. 969 return 0; 970 } 971 972 return (int) getStoredMessageCount(tx, sd, subscriptionKey); 973 } 974 }); 975 } finally { 976 indexLock.writeLock().unlock(); 977 } 978 } 979 980 @Override 981 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) 982 throws Exception { 983 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 984 @SuppressWarnings("unused") 985 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 986 indexLock.writeLock().lock(); 987 try { 988 pageFile.tx().execute(new Transaction.Closure<Exception>() { 989 @Override 990 public void execute(Transaction tx) throws Exception { 991 StoredDestination sd = getStoredDestination(dest, tx); 992 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 993 sd.orderIndex.setBatch(tx, cursorPos); 994 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 995 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator 996 .hasNext();) { 997 Entry<Long, MessageKeys> entry = iterator.next(); 998 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 999 continue; 1000 } 1001 listener.recoverMessage(loadMessage(entry.getValue().location)); 1002 } 1003 sd.orderIndex.resetCursorPosition(); 1004 } 1005 }); 1006 } finally { 1007 indexLock.writeLock().unlock(); 1008 } 1009 } 1010 1011 @Override 1012 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, 1013 final MessageRecoveryListener listener) throws Exception { 1014 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1015 @SuppressWarnings("unused") 1016 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 1017 indexLock.writeLock().lock(); 1018 try { 1019 pageFile.tx().execute(new Transaction.Closure<Exception>() { 1020 @Override 1021 public void execute(Transaction tx) throws Exception { 1022 StoredDestination sd = getStoredDestination(dest, tx); 1023 sd.orderIndex.resetCursorPosition(); 1024 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); 1025 if (moc == null) { 1026 LastAck pos = getLastAck(tx, sd, subscriptionKey); 1027 if (pos == null) { 1028 // sub deleted 1029 return; 1030 } 1031 sd.orderIndex.setBatch(tx, pos); 1032 moc = sd.orderIndex.cursor; 1033 } else { 1034 sd.orderIndex.cursor.sync(moc); 1035 } 1036 1037 Entry<Long, MessageKeys> entry = null; 1038 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 1039 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator 1040 .hasNext();) { 1041 entry = iterator.next(); 1042 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 1043 continue; 1044 } 1045 if (listener.recoverMessage(loadMessage(entry.getValue().location))) { 1046 counter++; 1047 } 1048 if (counter >= maxReturned || listener.hasSpace() == false) { 1049 break; 1050 } 1051 } 1052 sd.orderIndex.stoppedIterating(); 1053 if (entry != null) { 1054 MessageOrderCursor copy = sd.orderIndex.cursor.copy(); 1055 sd.subscriptionCursors.put(subscriptionKey, copy); 1056 } 1057 } 1058 }); 1059 } finally { 1060 indexLock.writeLock().unlock(); 1061 } 1062 } 1063 1064 @Override 1065 public void resetBatching(String clientId, String subscriptionName) { 1066 try { 1067 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1068 indexLock.writeLock().lock(); 1069 try { 1070 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1071 @Override 1072 public void execute(Transaction tx) throws IOException { 1073 StoredDestination sd = getStoredDestination(dest, tx); 1074 sd.subscriptionCursors.remove(subscriptionKey); 1075 } 1076 }); 1077 }finally { 1078 indexLock.writeLock().unlock(); 1079 } 1080 } catch (IOException e) { 1081 throw new RuntimeException(e); 1082 } 1083 } 1084 } 1085 1086 String subscriptionKey(String clientId, String subscriptionName) { 1087 return clientId + ":" + subscriptionName; 1088 } 1089 1090 @Override 1091 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 1092 String key = key(convert(destination)); 1093 MessageStore store = storeCache.get(key(convert(destination))); 1094 if (store == null) { 1095 final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination)); 1096 synchronized (storeCache) { 1097 store = storeCache.put(key, queueStore); 1098 if (store != null) { 1099 storeCache.put(key, store); 1100 } else { 1101 store = queueStore; 1102 } 1103 } 1104 } 1105 1106 return store; 1107 } 1108 1109 @Override 1110 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 1111 String key = key(convert(destination)); 1112 MessageStore store = storeCache.get(key(convert(destination))); 1113 if (store == null) { 1114 final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); 1115 synchronized (storeCache) { 1116 store = storeCache.put(key, topicStore); 1117 if (store != null) { 1118 storeCache.put(key, store); 1119 } else { 1120 store = topicStore; 1121 } 1122 } 1123 } 1124 1125 return (TopicMessageStore) store; 1126 } 1127 1128 /** 1129 * Cleanup method to remove any state associated with the given destination. 1130 * This method does not stop the message store (it might not be cached). 1131 * 1132 * @param destination 1133 * Destination to forget 1134 */ 1135 @Override 1136 public void removeQueueMessageStore(ActiveMQQueue destination) { 1137 } 1138 1139 /** 1140 * Cleanup method to remove any state associated with the given destination 1141 * This method does not stop the message store (it might not be cached). 1142 * 1143 * @param destination 1144 * Destination to forget 1145 */ 1146 @Override 1147 public void removeTopicMessageStore(ActiveMQTopic destination) { 1148 } 1149 1150 @Override 1151 public void deleteAllMessages() throws IOException { 1152 deleteAllMessages = true; 1153 } 1154 1155 @Override 1156 public Set<ActiveMQDestination> getDestinations() { 1157 try { 1158 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 1159 indexLock.writeLock().lock(); 1160 try { 1161 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1162 @Override 1163 public void execute(Transaction tx) throws IOException { 1164 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator 1165 .hasNext();) { 1166 Entry<String, StoredDestination> entry = iterator.next(); 1167 //Removing isEmpty topic check - see AMQ-5875 1168 rc.add(convert(entry.getKey())); 1169 } 1170 } 1171 }); 1172 }finally { 1173 indexLock.writeLock().unlock(); 1174 } 1175 return rc; 1176 } catch (IOException e) { 1177 throw new RuntimeException(e); 1178 } 1179 } 1180 1181 @Override 1182 public long getLastMessageBrokerSequenceId() throws IOException { 1183 return 0; 1184 } 1185 1186 @Override 1187 public long getLastProducerSequenceId(ProducerId id) { 1188 indexLock.writeLock().lock(); 1189 try { 1190 return metadata.producerSequenceIdTracker.getLastSeqId(id); 1191 } finally { 1192 indexLock.writeLock().unlock(); 1193 } 1194 } 1195 1196 @Override 1197 public long size() { 1198 try { 1199 return journalSize.get() + getPageFile().getDiskSize(); 1200 } catch (IOException e) { 1201 throw new RuntimeException(e); 1202 } 1203 } 1204 1205 @Override 1206 public void beginTransaction(ConnectionContext context) throws IOException { 1207 throw new IOException("Not yet implemented."); 1208 } 1209 @Override 1210 public void commitTransaction(ConnectionContext context) throws IOException { 1211 throw new IOException("Not yet implemented."); 1212 } 1213 @Override 1214 public void rollbackTransaction(ConnectionContext context) throws IOException { 1215 throw new IOException("Not yet implemented."); 1216 } 1217 1218 @Override 1219 public void checkpoint(boolean sync) throws IOException { 1220 super.checkpointCleanup(sync); 1221 } 1222 1223 // ///////////////////////////////////////////////////////////////// 1224 // Internal helper methods. 1225 // ///////////////////////////////////////////////////////////////// 1226 1227 /** 1228 * @param location 1229 * @return 1230 * @throws IOException 1231 */ 1232 Message loadMessage(Location location) throws IOException { 1233 try { 1234 JournalCommand<?> command = load(location); 1235 KahaAddMessageCommand addMessage = null; 1236 switch (command.type()) { 1237 case KAHA_UPDATE_MESSAGE_COMMAND: 1238 addMessage = ((KahaUpdateMessageCommand) command).getMessage(); 1239 break; 1240 case KAHA_ADD_MESSAGE_COMMAND: 1241 addMessage = (KahaAddMessageCommand) command; 1242 break; 1243 default: 1244 throw new IOException("Could not load journal record, unexpected command type: " + command.type() + " at location: " + location); 1245 } 1246 if (!addMessage.hasMessage()) { 1247 throw new IOException("Could not load journal record, null message content at location: " + location); 1248 } 1249 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); 1250 return msg; 1251 } catch (Throwable t) { 1252 IOException ioe = IOExceptionSupport.create("Unexpected error on journal read at: " + location , t); 1253 LOG.error("Failed to load message at: {}", location , ioe); 1254 brokerService.handleIOException(ioe); 1255 throw ioe; 1256 } 1257 } 1258 1259 // ///////////////////////////////////////////////////////////////// 1260 // Internal conversion methods. 1261 // ///////////////////////////////////////////////////////////////// 1262 1263 KahaLocation convert(Location location) { 1264 KahaLocation rc = new KahaLocation(); 1265 rc.setLogId(location.getDataFileId()); 1266 rc.setOffset(location.getOffset()); 1267 return rc; 1268 } 1269 1270 KahaDestination convert(ActiveMQDestination dest) { 1271 KahaDestination rc = new KahaDestination(); 1272 rc.setName(dest.getPhysicalName()); 1273 switch (dest.getDestinationType()) { 1274 case ActiveMQDestination.QUEUE_TYPE: 1275 rc.setType(DestinationType.QUEUE); 1276 return rc; 1277 case ActiveMQDestination.TOPIC_TYPE: 1278 rc.setType(DestinationType.TOPIC); 1279 return rc; 1280 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1281 rc.setType(DestinationType.TEMP_QUEUE); 1282 return rc; 1283 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1284 rc.setType(DestinationType.TEMP_TOPIC); 1285 return rc; 1286 default: 1287 return null; 1288 } 1289 } 1290 1291 ActiveMQDestination convert(String dest) { 1292 int p = dest.indexOf(":"); 1293 if (p < 0) { 1294 throw new IllegalArgumentException("Not in the valid destination format"); 1295 } 1296 int type = Integer.parseInt(dest.substring(0, p)); 1297 String name = dest.substring(p + 1); 1298 return convert(type, name); 1299 } 1300 1301 private ActiveMQDestination convert(KahaDestination commandDestination) { 1302 return convert(commandDestination.getType().getNumber(), commandDestination.getName()); 1303 } 1304 1305 private ActiveMQDestination convert(int type, String name) { 1306 switch (KahaDestination.DestinationType.valueOf(type)) { 1307 case QUEUE: 1308 return new ActiveMQQueue(name); 1309 case TOPIC: 1310 return new ActiveMQTopic(name); 1311 case TEMP_QUEUE: 1312 return new ActiveMQTempQueue(name); 1313 case TEMP_TOPIC: 1314 return new ActiveMQTempTopic(name); 1315 default: 1316 throw new IllegalArgumentException("Not in the valid destination format"); 1317 } 1318 } 1319 1320 public TransactionIdTransformer getTransactionIdTransformer() { 1321 return transactionIdTransformer; 1322 } 1323 1324 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 1325 this.transactionIdTransformer = transactionIdTransformer; 1326 } 1327 1328 static class AsyncJobKey { 1329 MessageId id; 1330 ActiveMQDestination destination; 1331 1332 AsyncJobKey(MessageId id, ActiveMQDestination destination) { 1333 this.id = id; 1334 this.destination = destination; 1335 } 1336 1337 @Override 1338 public boolean equals(Object obj) { 1339 if (obj == this) { 1340 return true; 1341 } 1342 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id) 1343 && destination.equals(((AsyncJobKey) obj).destination); 1344 } 1345 1346 @Override 1347 public int hashCode() { 1348 return id.hashCode() + destination.hashCode(); 1349 } 1350 1351 @Override 1352 public String toString() { 1353 return destination.getPhysicalName() + "-" + id; 1354 } 1355 } 1356 1357 public interface StoreTask { 1358 public boolean cancel(); 1359 1360 public void aquireLocks(); 1361 1362 public void releaseLocks(); 1363 } 1364 1365 class StoreQueueTask implements Runnable, StoreTask { 1366 protected final Message message; 1367 protected final ConnectionContext context; 1368 protected final KahaDBMessageStore store; 1369 protected final InnerFutureTask future; 1370 protected final AtomicBoolean done = new AtomicBoolean(); 1371 protected final AtomicBoolean locked = new AtomicBoolean(); 1372 1373 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) { 1374 this.store = store; 1375 this.context = context; 1376 this.message = message; 1377 this.future = new InnerFutureTask(this); 1378 } 1379 1380 public ListenableFuture<Object> getFuture() { 1381 return this.future; 1382 } 1383 1384 @Override 1385 public boolean cancel() { 1386 if (this.done.compareAndSet(false, true)) { 1387 return this.future.cancel(false); 1388 } 1389 return false; 1390 } 1391 1392 @Override 1393 public void aquireLocks() { 1394 if (this.locked.compareAndSet(false, true)) { 1395 try { 1396 globalQueueSemaphore.acquire(); 1397 store.acquireLocalAsyncLock(); 1398 message.incrementReferenceCount(); 1399 } catch (InterruptedException e) { 1400 LOG.warn("Failed to aquire lock", e); 1401 } 1402 } 1403 1404 } 1405 1406 @Override 1407 public void releaseLocks() { 1408 if (this.locked.compareAndSet(true, false)) { 1409 store.releaseLocalAsyncLock(); 1410 globalQueueSemaphore.release(); 1411 message.decrementReferenceCount(); 1412 } 1413 } 1414 1415 @Override 1416 public void run() { 1417 this.store.doneTasks++; 1418 try { 1419 if (this.done.compareAndSet(false, true)) { 1420 this.store.addMessage(context, message); 1421 removeQueueTask(this.store, this.message.getMessageId()); 1422 this.future.complete(); 1423 } else if (cancelledTaskModMetric > 0 && (++this.store.canceledTasks) % cancelledTaskModMetric == 0) { 1424 System.err.println(this.store.dest.getName() + " cancelled: " 1425 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1426 this.store.canceledTasks = this.store.doneTasks = 0; 1427 } 1428 } catch (Throwable t) { 1429 this.future.setException(t); 1430 removeQueueTask(this.store, this.message.getMessageId()); 1431 } 1432 } 1433 1434 protected Message getMessage() { 1435 return this.message; 1436 } 1437 1438 private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> { 1439 1440 private final AtomicReference<Runnable> listenerRef = new AtomicReference<>(); 1441 1442 public InnerFutureTask(Runnable runnable) { 1443 super(runnable, null); 1444 } 1445 1446 public void setException(final Throwable e) { 1447 super.setException(e); 1448 } 1449 1450 public void complete() { 1451 super.set(null); 1452 } 1453 1454 @Override 1455 public void done() { 1456 fireListener(); 1457 } 1458 1459 @Override 1460 public void addListener(Runnable listener) { 1461 this.listenerRef.set(listener); 1462 if (isDone()) { 1463 fireListener(); 1464 } 1465 } 1466 1467 private void fireListener() { 1468 Runnable listener = listenerRef.getAndSet(null); 1469 if (listener != null) { 1470 try { 1471 listener.run(); 1472 } catch (Exception ignored) { 1473 LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored); 1474 } 1475 } 1476 } 1477 } 1478 } 1479 1480 class StoreTopicTask extends StoreQueueTask { 1481 private final int subscriptionCount; 1482 private final List<String> subscriptionKeys = new ArrayList<String>(1); 1483 private final KahaDBTopicMessageStore topicStore; 1484 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, 1485 int subscriptionCount) { 1486 super(store, context, message); 1487 this.topicStore = store; 1488 this.subscriptionCount = subscriptionCount; 1489 1490 } 1491 1492 @Override 1493 public void aquireLocks() { 1494 if (this.locked.compareAndSet(false, true)) { 1495 try { 1496 globalTopicSemaphore.acquire(); 1497 store.acquireLocalAsyncLock(); 1498 message.incrementReferenceCount(); 1499 } catch (InterruptedException e) { 1500 LOG.warn("Failed to aquire lock", e); 1501 } 1502 } 1503 } 1504 1505 @Override 1506 public void releaseLocks() { 1507 if (this.locked.compareAndSet(true, false)) { 1508 message.decrementReferenceCount(); 1509 store.releaseLocalAsyncLock(); 1510 globalTopicSemaphore.release(); 1511 } 1512 } 1513 1514 /** 1515 * add a key 1516 * 1517 * @param key 1518 * @return true if all acknowledgements received 1519 */ 1520 public boolean addSubscriptionKey(String key) { 1521 synchronized (this.subscriptionKeys) { 1522 this.subscriptionKeys.add(key); 1523 } 1524 return this.subscriptionKeys.size() >= this.subscriptionCount; 1525 } 1526 1527 @Override 1528 public void run() { 1529 this.store.doneTasks++; 1530 try { 1531 if (this.done.compareAndSet(false, true)) { 1532 this.topicStore.addMessage(context, message); 1533 // apply any acks we have 1534 synchronized (this.subscriptionKeys) { 1535 for (String key : this.subscriptionKeys) { 1536 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); 1537 1538 } 1539 } 1540 removeTopicTask(this.topicStore, this.message.getMessageId()); 1541 this.future.complete(); 1542 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1543 System.err.println(this.store.dest.getName() + " cancelled: " 1544 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1545 this.store.canceledTasks = this.store.doneTasks = 0; 1546 } 1547 } catch (Throwable t) { 1548 this.future.setException(t); 1549 removeTopicTask(this.topicStore, this.message.getMessageId()); 1550 } 1551 } 1552 } 1553 1554 public class StoreTaskExecutor extends ThreadPoolExecutor { 1555 1556 public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) { 1557 super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory); 1558 } 1559 1560 @Override 1561 protected void afterExecute(Runnable runnable, Throwable throwable) { 1562 super.afterExecute(runnable, throwable); 1563 1564 if (runnable instanceof StoreTask) { 1565 ((StoreTask)runnable).releaseLocks(); 1566 } 1567 } 1568 } 1569 1570 @Override 1571 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 1572 return new JobSchedulerStoreImpl(); 1573 } 1574}