001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import static org.apache.activemq.transaction.Transaction.IN_USE_STATE; 020import static org.apache.activemq.broker.region.cursors.AbstractStoreCursor.gotToTheStore; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.Comparator; 026import java.util.HashSet; 027import java.util.Iterator; 028import java.util.LinkedHashMap; 029import java.util.LinkedHashSet; 030import java.util.LinkedList; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.concurrent.CancellationException; 035import java.util.concurrent.ConcurrentLinkedQueue; 036import java.util.concurrent.CountDownLatch; 037import java.util.concurrent.DelayQueue; 038import java.util.concurrent.Delayed; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicInteger; 043import java.util.concurrent.atomic.AtomicLong; 044import java.util.concurrent.locks.Lock; 045import java.util.concurrent.locks.ReentrantLock; 046import java.util.concurrent.locks.ReentrantReadWriteLock; 047 048import javax.jms.InvalidSelectorException; 049import javax.jms.JMSException; 050import javax.jms.ResourceAllocationException; 051 052import org.apache.activemq.broker.BrokerService; 053import org.apache.activemq.broker.BrokerStoppedException; 054import org.apache.activemq.broker.ConnectionContext; 055import org.apache.activemq.broker.ProducerBrokerExchange; 056import org.apache.activemq.broker.region.cursors.OrderedPendingList; 057import org.apache.activemq.broker.region.cursors.PendingList; 058import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 059import org.apache.activemq.broker.region.cursors.PrioritizedPendingList; 060import org.apache.activemq.broker.region.cursors.QueueDispatchPendingList; 061import org.apache.activemq.broker.region.cursors.StoreQueueCursor; 062import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 063import org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory; 064import org.apache.activemq.broker.region.group.MessageGroupMap; 065import org.apache.activemq.broker.region.group.MessageGroupMapFactory; 066import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 067import org.apache.activemq.broker.region.policy.DispatchPolicy; 068import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; 069import org.apache.activemq.broker.util.InsertionCountList; 070import org.apache.activemq.command.ActiveMQDestination; 071import org.apache.activemq.command.ActiveMQMessage; 072import org.apache.activemq.command.ConsumerId; 073import org.apache.activemq.command.ExceptionResponse; 074import org.apache.activemq.command.Message; 075import org.apache.activemq.command.MessageAck; 076import org.apache.activemq.command.MessageDispatchNotification; 077import org.apache.activemq.command.MessageId; 078import org.apache.activemq.command.ProducerAck; 079import org.apache.activemq.command.ProducerInfo; 080import org.apache.activemq.command.RemoveInfo; 081import org.apache.activemq.command.Response; 082import org.apache.activemq.filter.BooleanExpression; 083import org.apache.activemq.filter.MessageEvaluationContext; 084import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 085import org.apache.activemq.selector.SelectorParser; 086import org.apache.activemq.state.ProducerState; 087import org.apache.activemq.store.IndexListener; 088import org.apache.activemq.store.ListenableFuture; 089import org.apache.activemq.store.MessageRecoveryListener; 090import org.apache.activemq.store.MessageStore; 091import org.apache.activemq.thread.Task; 092import org.apache.activemq.thread.TaskRunner; 093import org.apache.activemq.thread.TaskRunnerFactory; 094import org.apache.activemq.transaction.Synchronization; 095import org.apache.activemq.usage.Usage; 096import org.apache.activemq.usage.UsageListener; 097import org.apache.activemq.util.BrokerSupport; 098import org.apache.activemq.util.ThreadPoolUtils; 099import org.slf4j.Logger; 100import org.slf4j.LoggerFactory; 101import org.slf4j.MDC; 102 103/** 104 * The Queue is a List of MessageEntry objects that are dispatched to matching 105 * subscriptions. 106 */ 107public class Queue extends BaseDestination implements Task, UsageListener, IndexListener { 108 protected static final Logger LOG = LoggerFactory.getLogger(Queue.class); 109 protected final TaskRunnerFactory taskFactory; 110 protected TaskRunner taskRunner; 111 private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock(); 112 protected final List<Subscription> consumers = new ArrayList<Subscription>(50); 113 private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock(); 114 protected PendingMessageCursor messages; 115 private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock(); 116 private final PendingList pagedInMessages = new OrderedPendingList(); 117 // Messages that are paged in but have not yet been targeted at a subscription 118 private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock(); 119 protected QueueDispatchPendingList dispatchPendingList = new QueueDispatchPendingList(); 120 private AtomicInteger pendingSends = new AtomicInteger(0); 121 private MessageGroupMap messageGroupOwners; 122 private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); 123 private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory(); 124 final Lock sendLock = new ReentrantLock(); 125 private ExecutorService executor; 126 private final Map<MessageId, Runnable> messagesWaitingForSpace = new LinkedHashMap<MessageId, Runnable>(); 127 private boolean useConsumerPriority = true; 128 private boolean strictOrderDispatch = false; 129 private final QueueDispatchSelector dispatchSelector; 130 private boolean optimizedDispatch = false; 131 private boolean iterationRunning = false; 132 private boolean firstConsumer = false; 133 private int timeBeforeDispatchStarts = 0; 134 private int consumersBeforeDispatchStarts = 0; 135 private CountDownLatch consumersBeforeStartsLatch; 136 private final AtomicLong pendingWakeups = new AtomicLong(); 137 private boolean allConsumersExclusiveByDefault = false; 138 139 private boolean resetNeeded; 140 141 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 142 @Override 143 public void run() { 144 asyncWakeup(); 145 } 146 }; 147 private final AtomicBoolean expiryTaskInProgress = new AtomicBoolean(false); 148 private final Runnable expireMessagesWork = new Runnable() { 149 @Override 150 public void run() { 151 expireMessages(); 152 expiryTaskInProgress.set(false); 153 } 154 }; 155 156 private final Runnable expireMessagesTask = new Runnable() { 157 @Override 158 public void run() { 159 if (expiryTaskInProgress.compareAndSet(false, true)) { 160 taskFactory.execute(expireMessagesWork); 161 } 162 } 163 }; 164 165 private final Object iteratingMutex = new Object(); 166 167 // gate on enabling cursor cache to ensure no outstanding sync 168 // send before async sends resume 169 public boolean singlePendingSend() { 170 return pendingSends.get() <= 1; 171 } 172 173 class TimeoutMessage implements Delayed { 174 175 Message message; 176 ConnectionContext context; 177 long trigger; 178 179 public TimeoutMessage(Message message, ConnectionContext context, long delay) { 180 this.message = message; 181 this.context = context; 182 this.trigger = System.currentTimeMillis() + delay; 183 } 184 185 @Override 186 public long getDelay(TimeUnit unit) { 187 long n = trigger - System.currentTimeMillis(); 188 return unit.convert(n, TimeUnit.MILLISECONDS); 189 } 190 191 @Override 192 public int compareTo(Delayed delayed) { 193 long other = ((TimeoutMessage) delayed).trigger; 194 int returnValue; 195 if (this.trigger < other) { 196 returnValue = -1; 197 } else if (this.trigger > other) { 198 returnValue = 1; 199 } else { 200 returnValue = 0; 201 } 202 return returnValue; 203 } 204 } 205 206 DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>(); 207 208 class FlowControlTimeoutTask extends Thread { 209 210 @Override 211 public void run() { 212 TimeoutMessage timeout; 213 try { 214 while (true) { 215 timeout = flowControlTimeoutMessages.take(); 216 if (timeout != null) { 217 synchronized (messagesWaitingForSpace) { 218 if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) { 219 ExceptionResponse response = new ExceptionResponse( 220 new ResourceAllocationException( 221 "Usage Manager Memory Limit Wait Timeout. Stopping producer (" 222 + timeout.message.getProducerId() 223 + ") to prevent flooding " 224 + getActiveMQDestination().getQualifiedName() 225 + "." 226 + " See http://activemq.apache.org/producer-flow-control.html for more info")); 227 response.setCorrelationId(timeout.message.getCommandId()); 228 timeout.context.getConnection().dispatchAsync(response); 229 } 230 } 231 } 232 } 233 } catch (InterruptedException e) { 234 LOG.debug(getName() + "Producer Flow Control Timeout Task is stopping"); 235 } 236 } 237 } 238 239 private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask(); 240 241 private final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() { 242 243 @Override 244 public int compare(Subscription s1, Subscription s2) { 245 // We want the list sorted in descending order 246 int val = s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority(); 247 if (val == 0 && messageGroupOwners != null) { 248 // then ascending order of assigned message groups to favour less loaded consumers 249 // Long.compare in jdk7 250 long x = s1.getConsumerInfo().getAssignedGroupCount(destination); 251 long y = s2.getConsumerInfo().getAssignedGroupCount(destination); 252 val = (x < y) ? -1 : ((x == y) ? 0 : 1); 253 } 254 return val; 255 } 256 }; 257 258 public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store, 259 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 260 super(brokerService, store, destination, parentStats); 261 this.taskFactory = taskFactory; 262 this.dispatchSelector = new QueueDispatchSelector(destination); 263 if (store != null) { 264 store.registerIndexListener(this); 265 } 266 } 267 268 @Override 269 public List<Subscription> getConsumers() { 270 consumersLock.readLock().lock(); 271 try { 272 return new ArrayList<Subscription>(consumers); 273 } finally { 274 consumersLock.readLock().unlock(); 275 } 276 } 277 278 // make the queue easily visible in the debugger from its task runner 279 // threads 280 final class QueueThread extends Thread { 281 final Queue queue; 282 283 public QueueThread(Runnable runnable, String name, Queue queue) { 284 super(runnable, name); 285 this.queue = queue; 286 } 287 } 288 289 class BatchMessageRecoveryListener implements MessageRecoveryListener { 290 final LinkedList<Message> toExpire = new LinkedList<Message>(); 291 final double totalMessageCount; 292 int recoveredAccumulator = 0; 293 int currentBatchCount; 294 295 BatchMessageRecoveryListener(int totalMessageCount) { 296 this.totalMessageCount = totalMessageCount; 297 currentBatchCount = recoveredAccumulator; 298 } 299 300 @Override 301 public boolean recoverMessage(Message message) { 302 recoveredAccumulator++; 303 if ((recoveredAccumulator % 10000) == 0) { 304 LOG.info("cursor for {} has recovered {} messages. {}% complete", new Object[]{ getActiveMQDestination().getQualifiedName(), recoveredAccumulator, new Integer((int) (recoveredAccumulator * 100 / totalMessageCount))}); 305 } 306 // Message could have expired while it was being 307 // loaded.. 308 message.setRegionDestination(Queue.this); 309 if (message.isExpired() && broker.isExpired(message)) { 310 toExpire.add(message); 311 return true; 312 } 313 if (hasSpace()) { 314 messagesLock.writeLock().lock(); 315 try { 316 try { 317 messages.addMessageLast(message); 318 } catch (Exception e) { 319 LOG.error("Failed to add message to cursor", e); 320 } 321 } finally { 322 messagesLock.writeLock().unlock(); 323 } 324 destinationStatistics.getMessages().increment(); 325 return true; 326 } 327 return false; 328 } 329 330 @Override 331 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 332 throw new RuntimeException("Should not be called."); 333 } 334 335 @Override 336 public boolean hasSpace() { 337 return true; 338 } 339 340 @Override 341 public boolean isDuplicate(MessageId id) { 342 return false; 343 } 344 345 public void reset() { 346 currentBatchCount = recoveredAccumulator; 347 } 348 349 public void processExpired() { 350 for (Message message: toExpire) { 351 messageExpired(createConnectionContext(), createMessageReference(message)); 352 // drop message will decrement so counter 353 // balance here 354 destinationStatistics.getMessages().increment(); 355 } 356 toExpire.clear(); 357 } 358 359 public boolean done() { 360 return currentBatchCount == recoveredAccumulator; 361 } 362 363 public boolean canRecoveryNextMessage() { 364 return true; 365 } 366 367 } 368 369 @Override 370 public void setPrioritizedMessages(boolean prioritizedMessages) { 371 super.setPrioritizedMessages(prioritizedMessages); 372 dispatchPendingList.setPrioritizedMessages(prioritizedMessages); 373 } 374 375 @Override 376 public void initialize() throws Exception { 377 378 if (this.messages == null) { 379 if (destination.isTemporary() || broker == null || store == null) { 380 this.messages = new VMPendingMessageCursor(isPrioritizedMessages()); 381 } else { 382 this.messages = new StoreQueueCursor(broker, this); 383 } 384 } 385 386 // If a VMPendingMessageCursor don't use the default Producer System 387 // Usage 388 // since it turns into a shared blocking queue which can lead to a 389 // network deadlock. 390 // If we are cursoring to disk..it's not and issue because it does not 391 // block due 392 // to large disk sizes. 393 if (messages instanceof VMPendingMessageCursor) { 394 this.systemUsage = brokerService.getSystemUsage(); 395 memoryUsage.setParent(systemUsage.getMemoryUsage()); 396 } 397 398 this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName()); 399 400 super.initialize(); 401 if (store != null) { 402 // Restore the persistent messages. 403 messages.setSystemUsage(systemUsage); 404 messages.setEnableAudit(isEnableAudit()); 405 messages.setMaxAuditDepth(getMaxAuditDepth()); 406 messages.setMaxProducersToAudit(getMaxProducersToAudit()); 407 messages.setUseCache(isUseCache()); 408 messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 409 final int messageCount = store.getMessageCount(); 410 if (messageCount > 0 && messages.isRecoveryRequired()) { 411 BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount); 412 do { 413 listener.reset(); 414 store.recoverNextMessages(getMaxPageSize(), listener); 415 listener.processExpired(); 416 } while (!listener.done()); 417 } else { 418 destinationStatistics.getMessages().add(messageCount); 419 } 420 } 421 } 422 423 /* 424 * Holder for subscription that needs attention on next iterate browser 425 * needs access to existing messages in the queue that have already been 426 * dispatched 427 */ 428 class BrowserDispatch { 429 QueueBrowserSubscription browser; 430 431 public BrowserDispatch(QueueBrowserSubscription browserSubscription) { 432 browser = browserSubscription; 433 browser.incrementQueueRef(); 434 } 435 436 public QueueBrowserSubscription getBrowser() { 437 return browser; 438 } 439 } 440 441 ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>(); 442 443 @Override 444 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { 445 LOG.debug("{} add sub: {}, dequeues: {}, dispatched: {}, inflight: {}", new Object[]{ getActiveMQDestination().getQualifiedName(), sub, getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getInflight().getCount() }); 446 447 super.addSubscription(context, sub); 448 // synchronize with dispatch method so that no new messages are sent 449 // while setting up a subscription. avoid out of order messages, 450 // duplicates, etc. 451 pagedInPendingDispatchLock.writeLock().lock(); 452 try { 453 454 sub.add(context, this); 455 456 // needs to be synchronized - so no contention with dispatching 457 // consumersLock. 458 consumersLock.writeLock().lock(); 459 try { 460 // set a flag if this is a first consumer 461 if (consumers.size() == 0) { 462 firstConsumer = true; 463 if (consumersBeforeDispatchStarts != 0) { 464 consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1); 465 } 466 } else { 467 if (consumersBeforeStartsLatch != null) { 468 consumersBeforeStartsLatch.countDown(); 469 } 470 } 471 472 addToConsumerList(sub); 473 if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) { 474 Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer(); 475 if (exclusiveConsumer == null) { 476 exclusiveConsumer = sub; 477 } else if (sub.getConsumerInfo().getPriority() == Byte.MAX_VALUE || 478 sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) { 479 exclusiveConsumer = sub; 480 } 481 dispatchSelector.setExclusiveConsumer(exclusiveConsumer); 482 } 483 } finally { 484 consumersLock.writeLock().unlock(); 485 } 486 487 if (sub instanceof QueueBrowserSubscription) { 488 // tee up for dispatch in next iterate 489 QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub; 490 BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription); 491 browserDispatches.add(browserDispatch); 492 } 493 494 if (!this.optimizedDispatch) { 495 wakeup(); 496 } 497 } finally { 498 pagedInPendingDispatchLock.writeLock().unlock(); 499 } 500 if (this.optimizedDispatch) { 501 // Outside of dispatchLock() to maintain the lock hierarchy of 502 // iteratingMutex -> dispatchLock. - see 503 // https://issues.apache.org/activemq/browse/AMQ-1878 504 wakeup(); 505 } 506 } 507 508 @Override 509 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) 510 throws Exception { 511 super.removeSubscription(context, sub, lastDeliveredSequenceId); 512 // synchronize with dispatch method so that no new messages are sent 513 // while removing up a subscription. 514 pagedInPendingDispatchLock.writeLock().lock(); 515 try { 516 LOG.debug("{} remove sub: {}, lastDeliveredSeqId: {}, dequeues: {}, dispatched: {}, inflight: {}, groups: {}", new Object[]{ 517 getActiveMQDestination().getQualifiedName(), 518 sub, 519 lastDeliveredSequenceId, 520 getDestinationStatistics().getDequeues().getCount(), 521 getDestinationStatistics().getDispatched().getCount(), 522 getDestinationStatistics().getInflight().getCount(), 523 sub.getConsumerInfo().getAssignedGroupCount(destination) 524 }); 525 consumersLock.writeLock().lock(); 526 try { 527 removeFromConsumerList(sub); 528 if (sub.getConsumerInfo().isExclusive()) { 529 Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer(); 530 if (exclusiveConsumer == sub) { 531 exclusiveConsumer = null; 532 for (Subscription s : consumers) { 533 if (s.getConsumerInfo().isExclusive() 534 && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer 535 .getConsumerInfo().getPriority())) { 536 exclusiveConsumer = s; 537 538 } 539 } 540 dispatchSelector.setExclusiveConsumer(exclusiveConsumer); 541 } 542 } else if (isAllConsumersExclusiveByDefault()) { 543 Subscription exclusiveConsumer = null; 544 for (Subscription s : consumers) { 545 if (exclusiveConsumer == null 546 || s.getConsumerInfo().getPriority() > exclusiveConsumer 547 .getConsumerInfo().getPriority()) { 548 exclusiveConsumer = s; 549 } 550 } 551 dispatchSelector.setExclusiveConsumer(exclusiveConsumer); 552 } 553 ConsumerId consumerId = sub.getConsumerInfo().getConsumerId(); 554 getMessageGroupOwners().removeConsumer(consumerId); 555 556 // redeliver inflight messages 557 558 boolean markAsRedelivered = false; 559 MessageReference lastDeliveredRef = null; 560 List<MessageReference> unAckedMessages = sub.remove(context, this); 561 562 // locate last redelivered in unconsumed list (list in delivery rather than seq order) 563 if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) { 564 for (MessageReference ref : unAckedMessages) { 565 if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) { 566 lastDeliveredRef = ref; 567 markAsRedelivered = true; 568 LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeliveredSequenceId, ref.getMessageId()); 569 break; 570 } 571 } 572 } 573 574 for (Iterator<MessageReference> unackedListIterator = unAckedMessages.iterator(); unackedListIterator.hasNext(); ) { 575 MessageReference ref = unackedListIterator.next(); 576 // AMQ-5107: don't resend if the broker is shutting down 577 if ( this.brokerService.isStopping() ) { 578 break; 579 } 580 QueueMessageReference qmr = (QueueMessageReference) ref; 581 if (qmr.getLockOwner() == sub) { 582 qmr.unlock(); 583 584 // have no delivery information 585 if (lastDeliveredSequenceId == RemoveInfo.LAST_DELIVERED_UNKNOWN) { 586 qmr.incrementRedeliveryCounter(); 587 } else { 588 if (markAsRedelivered) { 589 qmr.incrementRedeliveryCounter(); 590 } 591 if (ref == lastDeliveredRef) { 592 // all that follow were not redelivered 593 markAsRedelivered = false; 594 } 595 } 596 } 597 if (qmr.isDropped()) { 598 unackedListIterator.remove(); 599 } 600 } 601 dispatchPendingList.addForRedelivery(unAckedMessages, strictOrderDispatch && consumers.isEmpty()); 602 if (sub instanceof QueueBrowserSubscription) { 603 ((QueueBrowserSubscription)sub).decrementQueueRef(); 604 browserDispatches.remove(sub); 605 } 606 // AMQ-5107: don't resend if the broker is shutting down 607 if (dispatchPendingList.hasRedeliveries() && (! this.brokerService.isStopping())) { 608 doDispatch(new OrderedPendingList()); 609 } 610 } finally { 611 consumersLock.writeLock().unlock(); 612 } 613 if (!this.optimizedDispatch) { 614 wakeup(); 615 } 616 } finally { 617 pagedInPendingDispatchLock.writeLock().unlock(); 618 } 619 if (this.optimizedDispatch) { 620 // Outside of dispatchLock() to maintain the lock hierarchy of 621 // iteratingMutex -> dispatchLock. - see 622 // https://issues.apache.org/activemq/browse/AMQ-1878 623 wakeup(); 624 } 625 } 626 627 private volatile ResourceAllocationException sendMemAllocationException = null; 628 @Override 629 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 630 final ConnectionContext context = producerExchange.getConnectionContext(); 631 // There is delay between the client sending it and it arriving at the 632 // destination.. it may have expired. 633 message.setRegionDestination(this); 634 ProducerState state = producerExchange.getProducerState(); 635 if (state == null) { 636 LOG.warn("Send failed for: {}, missing producer state for: {}", message, producerExchange); 637 throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state"); 638 } 639 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 640 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 641 && !context.isInRecoveryMode(); 642 if (message.isExpired()) { 643 // message not stored - or added to stats yet - so chuck here 644 broker.getRoot().messageExpired(context, message, null); 645 if (sendProducerAck) { 646 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 647 context.getConnection().dispatchAsync(ack); 648 } 649 return; 650 } 651 if (memoryUsage.isFull()) { 652 isFull(context, memoryUsage); 653 fastProducer(context, producerInfo); 654 if (isProducerFlowControl() && context.isProducerFlowControl()) { 655 if (isFlowControlLogRequired()) { 656 LOG.warn("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 657 memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount()); 658 } else { 659 LOG.debug("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 660 memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount()); 661 } 662 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 663 ResourceAllocationException resourceAllocationException = sendMemAllocationException; 664 if (resourceAllocationException == null) { 665 synchronized (this) { 666 resourceAllocationException = sendMemAllocationException; 667 if (resourceAllocationException == null) { 668 sendMemAllocationException = resourceAllocationException = new ResourceAllocationException("Usage Manager Memory Limit reached on " 669 + getActiveMQDestination().getQualifiedName() + "." 670 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 671 } 672 } 673 } 674 throw resourceAllocationException; 675 } 676 677 // We can avoid blocking due to low usage if the producer is 678 // sending 679 // a sync message or if it is using a producer window 680 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 681 // copy the exchange state since the context will be 682 // modified while we are waiting 683 // for space. 684 final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy(); 685 synchronized (messagesWaitingForSpace) { 686 // Start flow control timeout task 687 // Prevent trying to start it multiple times 688 if (!flowControlTimeoutTask.isAlive()) { 689 flowControlTimeoutTask.setName(getName()+" Producer Flow Control Timeout Task"); 690 flowControlTimeoutTask.start(); 691 } 692 messagesWaitingForSpace.put(message.getMessageId(), new Runnable() { 693 @Override 694 public void run() { 695 696 try { 697 // While waiting for space to free up... the 698 // transaction may be done 699 if (message.isInTransaction()) { 700 if (context.getTransaction() == null || context.getTransaction().getState() > IN_USE_STATE) { 701 throw new JMSException("Send transaction completed while waiting for space"); 702 } 703 } 704 705 // the message may have expired. 706 if (message.isExpired()) { 707 LOG.error("message expired waiting for space"); 708 broker.messageExpired(context, message, null); 709 destinationStatistics.getExpired().increment(); 710 } else { 711 doMessageSend(producerExchangeCopy, message); 712 } 713 714 if (sendProducerAck) { 715 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 716 .getSize()); 717 context.getConnection().dispatchAsync(ack); 718 } else { 719 Response response = new Response(); 720 response.setCorrelationId(message.getCommandId()); 721 context.getConnection().dispatchAsync(response); 722 } 723 724 } catch (Exception e) { 725 if (!sendProducerAck && !context.isInRecoveryMode() && !brokerService.isStopping()) { 726 ExceptionResponse response = new ExceptionResponse(e); 727 response.setCorrelationId(message.getCommandId()); 728 context.getConnection().dispatchAsync(response); 729 } else { 730 LOG.debug("unexpected exception on deferred send of: {}", message, e); 731 } 732 } finally { 733 getDestinationStatistics().getBlockedSends().decrement(); 734 producerExchangeCopy.blockingOnFlowControl(false); 735 } 736 } 737 }); 738 739 getDestinationStatistics().getBlockedSends().increment(); 740 producerExchange.blockingOnFlowControl(true); 741 if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { 742 flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage 743 .getSendFailIfNoSpaceAfterTimeout())); 744 } 745 746 registerCallbackForNotFullNotification(); 747 context.setDontSendReponse(true); 748 return; 749 } 750 751 } else { 752 753 if (memoryUsage.isFull()) { 754 waitForSpace(context, producerExchange, memoryUsage, "Usage Manager Memory Limit reached. Producer (" 755 + message.getProducerId() + ") stopped to prevent flooding " 756 + getActiveMQDestination().getQualifiedName() + "." 757 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 758 } 759 760 // The usage manager could have delayed us by the time 761 // we unblock the message could have expired.. 762 if (message.isExpired()) { 763 LOG.debug("Expired message: {}", message); 764 broker.getRoot().messageExpired(context, message, null); 765 return; 766 } 767 } 768 } 769 } 770 doMessageSend(producerExchange, message); 771 if (sendProducerAck) { 772 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 773 context.getConnection().dispatchAsync(ack); 774 } 775 } 776 777 private void registerCallbackForNotFullNotification() { 778 // If the usage manager is not full, then the task will not 779 // get called.. 780 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 781 // so call it directly here. 782 sendMessagesWaitingForSpaceTask.run(); 783 } 784 } 785 786 private final LinkedList<MessageContext> indexOrderedCursorUpdates = new LinkedList<>(); 787 788 @Override 789 public void onAdd(MessageContext messageContext) { 790 synchronized (indexOrderedCursorUpdates) { 791 indexOrderedCursorUpdates.addLast(messageContext); 792 } 793 } 794 795 public void rollbackPendingCursorAdditions(MessageId messageId) { 796 synchronized (indexOrderedCursorUpdates) { 797 for (int i = indexOrderedCursorUpdates.size() - 1; i >= 0; i--) { 798 MessageContext mc = indexOrderedCursorUpdates.get(i); 799 if (mc.message.getMessageId().equals(messageId)) { 800 indexOrderedCursorUpdates.remove(mc); 801 if (mc.onCompletion != null) { 802 mc.onCompletion.run(); 803 } 804 break; 805 } 806 } 807 } 808 } 809 810 private void doPendingCursorAdditions() throws Exception { 811 LinkedList<MessageContext> orderedUpdates = new LinkedList<>(); 812 sendLock.lockInterruptibly(); 813 try { 814 synchronized (indexOrderedCursorUpdates) { 815 MessageContext candidate = indexOrderedCursorUpdates.peek(); 816 while (candidate != null && candidate.message.getMessageId().getFutureOrSequenceLong() != null) { 817 candidate = indexOrderedCursorUpdates.removeFirst(); 818 // check for duplicate adds suppressed by the store 819 if (candidate.message.getMessageId().getFutureOrSequenceLong() instanceof Long && ((Long)candidate.message.getMessageId().getFutureOrSequenceLong()).compareTo(-1l) == 0) { 820 LOG.warn("{} messageStore indicated duplicate add attempt for {}, suppressing duplicate dispatch", this, candidate.message.getMessageId()); 821 } else { 822 orderedUpdates.add(candidate); 823 } 824 candidate = indexOrderedCursorUpdates.peek(); 825 } 826 } 827 messagesLock.writeLock().lock(); 828 try { 829 for (MessageContext messageContext : orderedUpdates) { 830 if (!messages.addMessageLast(messageContext.message)) { 831 // cursor suppressed a duplicate 832 messageContext.duplicate = true; 833 } 834 if (messageContext.onCompletion != null) { 835 messageContext.onCompletion.run(); 836 } 837 } 838 } finally { 839 messagesLock.writeLock().unlock(); 840 } 841 } finally { 842 sendLock.unlock(); 843 } 844 for (MessageContext messageContext : orderedUpdates) { 845 if (!messageContext.duplicate) { 846 messageSent(messageContext.context, messageContext.message); 847 } 848 } 849 orderedUpdates.clear(); 850 } 851 852 final class CursorAddSync extends Synchronization { 853 854 private final MessageContext messageContext; 855 856 CursorAddSync(MessageContext messageContext) { 857 this.messageContext = messageContext; 858 this.messageContext.message.incrementReferenceCount(); 859 } 860 861 @Override 862 public void afterCommit() throws Exception { 863 if (store != null && messageContext.message.isPersistent()) { 864 doPendingCursorAdditions(); 865 } else { 866 cursorAdd(messageContext.message); 867 messageSent(messageContext.context, messageContext.message); 868 } 869 messageContext.message.decrementReferenceCount(); 870 } 871 872 @Override 873 public void afterRollback() throws Exception { 874 if (store != null && messageContext.message.isPersistent()) { 875 rollbackPendingCursorAdditions(messageContext.message.getMessageId()); 876 } 877 messageContext.message.decrementReferenceCount(); 878 } 879 } 880 881 void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, 882 Exception { 883 final ConnectionContext context = producerExchange.getConnectionContext(); 884 ListenableFuture<Object> result = null; 885 886 producerExchange.incrementSend(); 887 pendingSends.incrementAndGet(); 888 checkUsage(context, producerExchange, message); 889 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 890 if (store != null && message.isPersistent()) { 891 message.getMessageId().setFutureOrSequenceLong(null); 892 try { 893 //AMQ-6133 - don't store async if using persistJMSRedelivered 894 //This flag causes a sync update later on dispatch which can cause a race 895 //condition if the original add is processed after the update, which can cause 896 //a duplicate message to be stored 897 if (messages.isCacheEnabled() && !isPersistJMSRedelivered()) { 898 result = store.asyncAddQueueMessage(context, message, isOptimizeStorage()); 899 result.addListener(new PendingMarshalUsageTracker(message)); 900 } else { 901 store.addMessage(context, message); 902 } 903 if (isReduceMemoryFootprint()) { 904 message.clearMarshalledState(); 905 } 906 } catch (Exception e) { 907 // we may have a store in inconsistent state, so reset the cursor 908 // before restarting normal broker operations 909 resetNeeded = true; 910 pendingSends.decrementAndGet(); 911 rollbackPendingCursorAdditions(message.getMessageId()); 912 throw e; 913 } 914 } 915 orderedCursorAdd(message, context); 916 if (result != null && message.isResponseRequired() && !result.isCancelled()) { 917 try { 918 result.get(); 919 } catch (CancellationException e) { 920 // ignore - the task has been cancelled if the message 921 // has already been deleted 922 } 923 } 924 } 925 926 private void orderedCursorAdd(Message message, ConnectionContext context) throws Exception { 927 if (context.isInTransaction()) { 928 context.getTransaction().addSynchronization(new CursorAddSync(new MessageContext(context, message, null))); 929 } else if (store != null && message.isPersistent()) { 930 doPendingCursorAdditions(); 931 } else { 932 // no ordering issue with non persistent messages 933 cursorAdd(message); 934 messageSent(context, message); 935 } 936 } 937 938 private void checkUsage(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Message message) throws ResourceAllocationException, IOException, InterruptedException { 939 if (message.isPersistent()) { 940 if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 941 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of " 942 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" 943 + message.getProducerId() + ") to prevent flooding " 944 + getActiveMQDestination().getQualifiedName() + "." 945 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 946 947 waitForSpace(context, producerBrokerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 948 } 949 } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) { 950 final String logMessage = "Temp Store is Full (" 951 + systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit() 952 +"). Stopping producer (" + message.getProducerId() 953 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 954 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 955 956 waitForSpace(context, producerBrokerExchange, messages.getSystemUsage().getTempUsage(), logMessage); 957 } 958 } 959 960 private void expireMessages() { 961 LOG.debug("{} expiring messages ..", getActiveMQDestination().getQualifiedName()); 962 963 // just track the insertion count 964 List<Message> browsedMessages = new InsertionCountList<Message>(); 965 doBrowse(browsedMessages, this.getMaxExpirePageSize()); 966 asyncWakeup(); 967 LOG.debug("{} expiring messages done.", getActiveMQDestination().getQualifiedName()); 968 } 969 970 @Override 971 public void gc() { 972 } 973 974 @Override 975 public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) 976 throws IOException { 977 messageConsumed(context, node); 978 if (store != null && node.isPersistent()) { 979 store.removeAsyncMessage(context, convertToNonRangedAck(ack, node)); 980 } 981 } 982 983 Message loadMessage(MessageId messageId) throws IOException { 984 Message msg = null; 985 if (store != null) { // can be null for a temp q 986 msg = store.getMessage(messageId); 987 if (msg != null) { 988 msg.setRegionDestination(this); 989 } 990 } 991 return msg; 992 } 993 994 @Override 995 public String toString() { 996 return destination.getQualifiedName() + ", subscriptions=" + consumers.size() 997 + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + destinationStatistics.getMessages().getCount() + ", pending=" 998 + indexOrderedCursorUpdates.size(); 999 } 1000 1001 @Override 1002 public void start() throws Exception { 1003 if (started.compareAndSet(false, true)) { 1004 if (memoryUsage != null) { 1005 memoryUsage.start(); 1006 } 1007 if (systemUsage.getStoreUsage() != null) { 1008 systemUsage.getStoreUsage().start(); 1009 } 1010 systemUsage.getMemoryUsage().addUsageListener(this); 1011 messages.start(); 1012 if (getExpireMessagesPeriod() > 0) { 1013 scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); 1014 } 1015 doPageIn(false); 1016 } 1017 } 1018 1019 @Override 1020 public void stop() throws Exception { 1021 if (started.compareAndSet(true, false)) { 1022 if (taskRunner != null) { 1023 taskRunner.shutdown(); 1024 } 1025 if (this.executor != null) { 1026 ThreadPoolUtils.shutdownNow(executor); 1027 executor = null; 1028 } 1029 1030 scheduler.cancel(expireMessagesTask); 1031 1032 if (flowControlTimeoutTask.isAlive()) { 1033 flowControlTimeoutTask.interrupt(); 1034 } 1035 1036 if (messages != null) { 1037 messages.stop(); 1038 } 1039 1040 for (MessageReference messageReference : pagedInMessages.values()) { 1041 messageReference.decrementReferenceCount(); 1042 } 1043 pagedInMessages.clear(); 1044 1045 systemUsage.getMemoryUsage().removeUsageListener(this); 1046 if (memoryUsage != null) { 1047 memoryUsage.stop(); 1048 } 1049 if (store != null) { 1050 store.stop(); 1051 } 1052 } 1053 } 1054 1055 // Properties 1056 // ------------------------------------------------------------------------- 1057 @Override 1058 public ActiveMQDestination getActiveMQDestination() { 1059 return destination; 1060 } 1061 1062 public MessageGroupMap getMessageGroupOwners() { 1063 if (messageGroupOwners == null) { 1064 messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap(); 1065 messageGroupOwners.setDestination(this); 1066 } 1067 return messageGroupOwners; 1068 } 1069 1070 public DispatchPolicy getDispatchPolicy() { 1071 return dispatchPolicy; 1072 } 1073 1074 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 1075 this.dispatchPolicy = dispatchPolicy; 1076 } 1077 1078 public MessageGroupMapFactory getMessageGroupMapFactory() { 1079 return messageGroupMapFactory; 1080 } 1081 1082 public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) { 1083 this.messageGroupMapFactory = messageGroupMapFactory; 1084 } 1085 1086 public PendingMessageCursor getMessages() { 1087 return this.messages; 1088 } 1089 1090 public void setMessages(PendingMessageCursor messages) { 1091 this.messages = messages; 1092 } 1093 1094 public boolean isUseConsumerPriority() { 1095 return useConsumerPriority; 1096 } 1097 1098 public void setUseConsumerPriority(boolean useConsumerPriority) { 1099 this.useConsumerPriority = useConsumerPriority; 1100 } 1101 1102 public boolean isStrictOrderDispatch() { 1103 return strictOrderDispatch; 1104 } 1105 1106 public void setStrictOrderDispatch(boolean strictOrderDispatch) { 1107 this.strictOrderDispatch = strictOrderDispatch; 1108 } 1109 1110 public boolean isOptimizedDispatch() { 1111 return optimizedDispatch; 1112 } 1113 1114 public void setOptimizedDispatch(boolean optimizedDispatch) { 1115 this.optimizedDispatch = optimizedDispatch; 1116 } 1117 1118 public int getTimeBeforeDispatchStarts() { 1119 return timeBeforeDispatchStarts; 1120 } 1121 1122 public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) { 1123 this.timeBeforeDispatchStarts = timeBeforeDispatchStarts; 1124 } 1125 1126 public int getConsumersBeforeDispatchStarts() { 1127 return consumersBeforeDispatchStarts; 1128 } 1129 1130 public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) { 1131 this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts; 1132 } 1133 1134 public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) { 1135 this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault; 1136 } 1137 1138 public boolean isAllConsumersExclusiveByDefault() { 1139 return allConsumersExclusiveByDefault; 1140 } 1141 1142 public boolean isResetNeeded() { 1143 return resetNeeded; 1144 } 1145 1146 // Implementation methods 1147 // ------------------------------------------------------------------------- 1148 private QueueMessageReference createMessageReference(Message message) { 1149 QueueMessageReference result = new IndirectMessageReference(message); 1150 return result; 1151 } 1152 1153 @Override 1154 public Message[] browse() { 1155 List<Message> browseList = new ArrayList<Message>(); 1156 doBrowse(browseList, getMaxBrowsePageSize()); 1157 return browseList.toArray(new Message[browseList.size()]); 1158 } 1159 1160 public void doBrowse(List<Message> browseList, int max) { 1161 final ConnectionContext connectionContext = createConnectionContext(); 1162 try { 1163 int maxPageInAttempts = 1; 1164 if (max > 0) { 1165 messagesLock.readLock().lock(); 1166 try { 1167 maxPageInAttempts += (messages.size() / max); 1168 } finally { 1169 messagesLock.readLock().unlock(); 1170 } 1171 while (shouldPageInMoreForBrowse(max) && maxPageInAttempts-- > 0) { 1172 pageInMessages(!memoryUsage.isFull(110), max); 1173 } 1174 } 1175 doBrowseList(browseList, max, dispatchPendingList, pagedInPendingDispatchLock, connectionContext, "redeliveredWaitingDispatch+pagedInPendingDispatch"); 1176 doBrowseList(browseList, max, pagedInMessages, pagedInMessagesLock, connectionContext, "pagedInMessages"); 1177 1178 // we need a store iterator to walk messages on disk, independent of the cursor which is tracking 1179 // the next message batch 1180 } catch (BrokerStoppedException ignored) { 1181 } catch (Exception e) { 1182 LOG.error("Problem retrieving message for browse", e); 1183 } 1184 } 1185 1186 protected void doBrowseList(List<Message> browseList, int max, PendingList list, ReentrantReadWriteLock lock, ConnectionContext connectionContext, String name) throws Exception { 1187 List<MessageReference> toExpire = new ArrayList<MessageReference>(); 1188 lock.readLock().lock(); 1189 try { 1190 addAll(list.values(), browseList, max, toExpire); 1191 } finally { 1192 lock.readLock().unlock(); 1193 } 1194 for (MessageReference ref : toExpire) { 1195 if (broker.isExpired(ref)) { 1196 LOG.debug("expiring from {}: {}", name, ref); 1197 messageExpired(connectionContext, ref); 1198 } else { 1199 lock.writeLock().lock(); 1200 try { 1201 list.remove(ref); 1202 } finally { 1203 lock.writeLock().unlock(); 1204 } 1205 ref.decrementReferenceCount(); 1206 } 1207 } 1208 } 1209 1210 private boolean shouldPageInMoreForBrowse(int max) { 1211 int alreadyPagedIn = 0; 1212 pagedInMessagesLock.readLock().lock(); 1213 try { 1214 alreadyPagedIn = pagedInMessages.size(); 1215 } finally { 1216 pagedInMessagesLock.readLock().unlock(); 1217 } 1218 int messagesInQueue = alreadyPagedIn; 1219 messagesLock.readLock().lock(); 1220 try { 1221 messagesInQueue += messages.size(); 1222 } finally { 1223 messagesLock.readLock().unlock(); 1224 } 1225 1226 LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, messagesInQueue, memoryUsage.getPercentUsage()}); 1227 return (alreadyPagedIn == 0 || (alreadyPagedIn < max) 1228 && (alreadyPagedIn < messagesInQueue) 1229 && messages.hasSpace()); 1230 } 1231 1232 private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int max, 1233 List<MessageReference> toExpire) throws Exception { 1234 for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < max;) { 1235 QueueMessageReference ref = (QueueMessageReference) i.next(); 1236 if (ref.isExpired() && (ref.getLockOwner() == null)) { 1237 toExpire.add(ref); 1238 } else if (!ref.isAcked() && l.contains(ref.getMessage()) == false) { 1239 l.add(ref.getMessage()); 1240 } 1241 } 1242 } 1243 1244 public QueueMessageReference getMessage(String id) { 1245 MessageId msgId = new MessageId(id); 1246 pagedInMessagesLock.readLock().lock(); 1247 try { 1248 QueueMessageReference ref = (QueueMessageReference)this.pagedInMessages.get(msgId); 1249 if (ref != null) { 1250 return ref; 1251 } 1252 } finally { 1253 pagedInMessagesLock.readLock().unlock(); 1254 } 1255 messagesLock.writeLock().lock(); 1256 try{ 1257 try { 1258 messages.reset(); 1259 while (messages.hasNext()) { 1260 MessageReference mr = messages.next(); 1261 QueueMessageReference qmr = createMessageReference(mr.getMessage()); 1262 qmr.decrementReferenceCount(); 1263 messages.rollback(qmr.getMessageId()); 1264 if (msgId.equals(qmr.getMessageId())) { 1265 return qmr; 1266 } 1267 } 1268 } finally { 1269 messages.release(); 1270 } 1271 }finally { 1272 messagesLock.writeLock().unlock(); 1273 } 1274 return null; 1275 } 1276 1277 public void purge() throws Exception { 1278 ConnectionContext c = createConnectionContext(); 1279 List<MessageReference> list = null; 1280 try { 1281 sendLock.lock(); 1282 long originalMessageCount = this.destinationStatistics.getMessages().getCount(); 1283 do { 1284 doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed. 1285 pagedInMessagesLock.readLock().lock(); 1286 try { 1287 list = new ArrayList<MessageReference>(pagedInMessages.values()); 1288 }finally { 1289 pagedInMessagesLock.readLock().unlock(); 1290 } 1291 1292 for (MessageReference ref : list) { 1293 try { 1294 QueueMessageReference r = (QueueMessageReference) ref; 1295 removeMessage(c, r); 1296 messages.rollback(r.getMessageId()); 1297 } catch (IOException e) { 1298 } 1299 } 1300 // don't spin/hang if stats are out and there is nothing left in the 1301 // store 1302 } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0); 1303 1304 if (this.destinationStatistics.getMessages().getCount() > 0) { 1305 LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount()); 1306 } 1307 } finally { 1308 sendLock.unlock(); 1309 } 1310 } 1311 1312 @Override 1313 public void clearPendingMessages(int pendingAdditionsCount) { 1314 messagesLock.writeLock().lock(); 1315 try { 1316 final ActiveMQMessage dummyPersistent = new ActiveMQMessage(); 1317 dummyPersistent.setPersistent(true); 1318 for (int i=0; i<pendingAdditionsCount; i++) { 1319 try { 1320 // track the increase in the cursor size w/o reverting to the store 1321 messages.addMessageFirst(dummyPersistent); 1322 } catch (Exception ignored) { 1323 LOG.debug("Unexpected exception on tracking pending message additions", ignored); 1324 } 1325 } 1326 if (resetNeeded) { 1327 messages.gc(); 1328 messages.reset(); 1329 resetNeeded = false; 1330 } else { 1331 messages.rebase(); 1332 } 1333 asyncWakeup(); 1334 } finally { 1335 messagesLock.writeLock().unlock(); 1336 } 1337 } 1338 1339 /** 1340 * Removes the message matching the given messageId 1341 */ 1342 public boolean removeMessage(String messageId) throws Exception { 1343 return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0; 1344 } 1345 1346 /** 1347 * Removes the messages matching the given selector 1348 * 1349 * @return the number of messages removed 1350 */ 1351 public int removeMatchingMessages(String selector) throws Exception { 1352 return removeMatchingMessages(selector, -1); 1353 } 1354 1355 /** 1356 * Removes the messages matching the given selector up to the maximum number 1357 * of matched messages 1358 * 1359 * @return the number of messages removed 1360 */ 1361 public int removeMatchingMessages(String selector, int maximumMessages) throws Exception { 1362 return removeMatchingMessages(createSelectorFilter(selector), maximumMessages); 1363 } 1364 1365 /** 1366 * Removes the messages matching the given filter up to the maximum number 1367 * of matched messages 1368 * 1369 * @return the number of messages removed 1370 */ 1371 public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception { 1372 int movedCounter = 0; 1373 Set<MessageReference> set = new LinkedHashSet<MessageReference>(); 1374 ConnectionContext context = createConnectionContext(); 1375 do { 1376 doPageIn(true); 1377 pagedInMessagesLock.readLock().lock(); 1378 try { 1379 if (!set.addAll(pagedInMessages.values())) { 1380 // nothing new to check - mem constraint on page in 1381 return movedCounter; 1382 }; 1383 } finally { 1384 pagedInMessagesLock.readLock().unlock(); 1385 } 1386 List<MessageReference> list = new ArrayList<MessageReference>(set); 1387 for (MessageReference ref : list) { 1388 IndirectMessageReference r = (IndirectMessageReference) ref; 1389 if (filter.evaluate(context, r)) { 1390 1391 removeMessage(context, r); 1392 set.remove(r); 1393 if (++movedCounter >= maximumMessages && maximumMessages > 0) { 1394 return movedCounter; 1395 } 1396 } 1397 } 1398 } while (set.size() < this.destinationStatistics.getMessages().getCount()); 1399 return movedCounter; 1400 } 1401 1402 /** 1403 * Copies the message matching the given messageId 1404 */ 1405 public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) 1406 throws Exception { 1407 return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0; 1408 } 1409 1410 /** 1411 * Copies the messages matching the given selector 1412 * 1413 * @return the number of messages copied 1414 */ 1415 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) 1416 throws Exception { 1417 return copyMatchingMessagesTo(context, selector, dest, -1); 1418 } 1419 1420 /** 1421 * Copies the messages matching the given selector up to the maximum number 1422 * of matched messages 1423 * 1424 * @return the number of messages copied 1425 */ 1426 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, 1427 int maximumMessages) throws Exception { 1428 return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages); 1429 } 1430 1431 /** 1432 * Copies the messages matching the given filter up to the maximum number of 1433 * matched messages 1434 * 1435 * @return the number of messages copied 1436 */ 1437 public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, 1438 int maximumMessages) throws Exception { 1439 1440 if (destination.equals(dest)) { 1441 return 0; 1442 } 1443 1444 int movedCounter = 0; 1445 int count = 0; 1446 Set<MessageReference> set = new LinkedHashSet<MessageReference>(); 1447 do { 1448 doPageIn(true, false, (messages.isCacheEnabled() || !broker.getBrokerService().isPersistent()) ? messages.size() : getMaxBrowsePageSize()); 1449 pagedInMessagesLock.readLock().lock(); 1450 try { 1451 if (!set.addAll(pagedInMessages.values())) { 1452 // nothing new to check - mem constraint on page in 1453 return movedCounter; 1454 } 1455 } finally { 1456 pagedInMessagesLock.readLock().unlock(); 1457 } 1458 List<MessageReference> list = new ArrayList<MessageReference>(set); 1459 for (MessageReference ref : list) { 1460 IndirectMessageReference r = (IndirectMessageReference) ref; 1461 if (filter.evaluate(context, r)) { 1462 1463 r.incrementReferenceCount(); 1464 try { 1465 Message m = r.getMessage(); 1466 BrokerSupport.resend(context, m, dest); 1467 if (++movedCounter >= maximumMessages && maximumMessages > 0) { 1468 return movedCounter; 1469 } 1470 } finally { 1471 r.decrementReferenceCount(); 1472 } 1473 } 1474 count++; 1475 } 1476 } while (count < this.destinationStatistics.getMessages().getCount()); 1477 return movedCounter; 1478 } 1479 1480 /** 1481 * Move a message 1482 * 1483 * @param context 1484 * connection context 1485 * @param m 1486 * QueueMessageReference 1487 * @param dest 1488 * ActiveMQDestination 1489 * @throws Exception 1490 */ 1491 public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception { 1492 Set<Destination> destsToPause = regionBroker.getDestinations(dest); 1493 try { 1494 for (Destination d: destsToPause) { 1495 if (d instanceof Queue) { 1496 ((Queue)d).pauseDispatch(); 1497 } 1498 } 1499 BrokerSupport.resend(context, m.getMessage(), dest); 1500 removeMessage(context, m); 1501 messagesLock.writeLock().lock(); 1502 try { 1503 messages.rollback(m.getMessageId()); 1504 if (isDLQ()) { 1505 ActiveMQDestination originalDestination = m.getMessage().getOriginalDestination(); 1506 if (originalDestination != null) { 1507 for (Destination destination : regionBroker.getDestinations(originalDestination)) { 1508 DeadLetterStrategy strategy = destination.getDeadLetterStrategy(); 1509 strategy.rollback(m.getMessage()); 1510 } 1511 } 1512 } 1513 } finally { 1514 messagesLock.writeLock().unlock(); 1515 } 1516 } finally { 1517 for (Destination d: destsToPause) { 1518 if (d instanceof Queue) { 1519 ((Queue)d).resumeDispatch(); 1520 } 1521 } 1522 } 1523 1524 return true; 1525 } 1526 1527 /** 1528 * Moves the message matching the given messageId 1529 */ 1530 public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) 1531 throws Exception { 1532 return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0; 1533 } 1534 1535 /** 1536 * Moves the messages matching the given selector 1537 * 1538 * @return the number of messages removed 1539 */ 1540 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) 1541 throws Exception { 1542 return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE); 1543 } 1544 1545 /** 1546 * Moves the messages matching the given selector up to the maximum number 1547 * of matched messages 1548 */ 1549 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, 1550 int maximumMessages) throws Exception { 1551 return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages); 1552 } 1553 1554 /** 1555 * Moves the messages matching the given filter up to the maximum number of 1556 * matched messages 1557 */ 1558 public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, 1559 ActiveMQDestination dest, int maximumMessages) throws Exception { 1560 1561 if (destination.equals(dest)) { 1562 return 0; 1563 } 1564 1565 int movedCounter = 0; 1566 Set<MessageReference> set = new LinkedHashSet<MessageReference>(); 1567 do { 1568 doPageIn(true); 1569 pagedInMessagesLock.readLock().lock(); 1570 try { 1571 if (!set.addAll(pagedInMessages.values())) { 1572 // nothing new to check - mem constraint on page in 1573 return movedCounter; 1574 } 1575 } finally { 1576 pagedInMessagesLock.readLock().unlock(); 1577 } 1578 List<MessageReference> list = new ArrayList<MessageReference>(set); 1579 for (MessageReference ref : list) { 1580 if (filter.evaluate(context, ref)) { 1581 // We should only move messages that can be locked. 1582 moveMessageTo(context, (QueueMessageReference)ref, dest); 1583 set.remove(ref); 1584 if (++movedCounter >= maximumMessages && maximumMessages > 0) { 1585 return movedCounter; 1586 } 1587 } 1588 } 1589 } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages); 1590 return movedCounter; 1591 } 1592 1593 public int retryMessages(ConnectionContext context, int maximumMessages) throws Exception { 1594 if (!isDLQ()) { 1595 throw new Exception("Retry of message is only possible on Dead Letter Queues!"); 1596 } 1597 int restoredCounter = 0; 1598 // ensure we deal with a snapshot to avoid potential duplicates in the event of messages 1599 // getting immediate dlq'ed 1600 long numberOfRetryAttemptsToCheckAllMessagesOnce = this.destinationStatistics.getMessages().getCount(); 1601 Set<MessageReference> set = new LinkedHashSet<MessageReference>(); 1602 do { 1603 doPageIn(true); 1604 pagedInMessagesLock.readLock().lock(); 1605 try { 1606 if (!set.addAll(pagedInMessages.values())) { 1607 // nothing new to check - mem constraint on page in 1608 return restoredCounter; 1609 } 1610 } finally { 1611 pagedInMessagesLock.readLock().unlock(); 1612 } 1613 List<MessageReference> list = new ArrayList<MessageReference>(set); 1614 for (MessageReference ref : list) { 1615 numberOfRetryAttemptsToCheckAllMessagesOnce--; 1616 if (ref.getMessage().getOriginalDestination() != null) { 1617 1618 moveMessageTo(context, (QueueMessageReference)ref, ref.getMessage().getOriginalDestination()); 1619 set.remove(ref); 1620 if (++restoredCounter >= maximumMessages && maximumMessages > 0) { 1621 return restoredCounter; 1622 } 1623 } 1624 } 1625 } while (numberOfRetryAttemptsToCheckAllMessagesOnce > 0 && set.size() < this.destinationStatistics.getMessages().getCount()); 1626 return restoredCounter; 1627 } 1628 1629 /** 1630 * @return true if we would like to iterate again 1631 * @see org.apache.activemq.thread.Task#iterate() 1632 */ 1633 @Override 1634 public boolean iterate() { 1635 MDC.put("activemq.destination", getName()); 1636 boolean pageInMoreMessages = false; 1637 synchronized (iteratingMutex) { 1638 1639 // If optimize dispatch is on or this is a slave this method could be called recursively 1640 // we set this state value to short-circuit wakeup in those cases to avoid that as it 1641 // could lead to errors. 1642 iterationRunning = true; 1643 1644 // do early to allow dispatch of these waiting messages 1645 synchronized (messagesWaitingForSpace) { 1646 Iterator<Runnable> it = messagesWaitingForSpace.values().iterator(); 1647 while (it.hasNext()) { 1648 if (!memoryUsage.isFull()) { 1649 Runnable op = it.next(); 1650 it.remove(); 1651 op.run(); 1652 } else { 1653 registerCallbackForNotFullNotification(); 1654 break; 1655 } 1656 } 1657 } 1658 1659 if (firstConsumer) { 1660 firstConsumer = false; 1661 try { 1662 if (consumersBeforeDispatchStarts > 0) { 1663 int timeout = 1000; // wait one second by default if 1664 // consumer count isn't reached 1665 if (timeBeforeDispatchStarts > 0) { 1666 timeout = timeBeforeDispatchStarts; 1667 } 1668 if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) { 1669 LOG.debug("{} consumers subscribed. Starting dispatch.", consumers.size()); 1670 } else { 1671 LOG.debug("{} ms elapsed and {} consumers subscribed. Starting dispatch.", timeout, consumers.size()); 1672 } 1673 } 1674 if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) { 1675 iteratingMutex.wait(timeBeforeDispatchStarts); 1676 LOG.debug("{} ms elapsed. Starting dispatch.", timeBeforeDispatchStarts); 1677 } 1678 } catch (Exception e) { 1679 LOG.error(e.toString()); 1680 } 1681 } 1682 1683 messagesLock.readLock().lock(); 1684 try{ 1685 pageInMoreMessages |= !messages.isEmpty(); 1686 } finally { 1687 messagesLock.readLock().unlock(); 1688 } 1689 1690 pagedInPendingDispatchLock.readLock().lock(); 1691 try { 1692 pageInMoreMessages |= !dispatchPendingList.isEmpty(); 1693 } finally { 1694 pagedInPendingDispatchLock.readLock().unlock(); 1695 } 1696 1697 boolean hasBrowsers = !browserDispatches.isEmpty(); 1698 1699 if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) { 1700 try { 1701 pageInMessages(hasBrowsers && getMaxBrowsePageSize() > 0, getMaxPageSize()); 1702 } catch (Throwable e) { 1703 LOG.error("Failed to page in more queue messages ", e); 1704 } 1705 } 1706 1707 if (hasBrowsers) { 1708 PendingList messagesInMemory = isPrioritizedMessages() ? 1709 new PrioritizedPendingList() : new OrderedPendingList(); 1710 pagedInMessagesLock.readLock().lock(); 1711 try { 1712 messagesInMemory.addAll(pagedInMessages); 1713 } finally { 1714 pagedInMessagesLock.readLock().unlock(); 1715 } 1716 1717 Iterator<BrowserDispatch> browsers = browserDispatches.iterator(); 1718 while (browsers.hasNext()) { 1719 BrowserDispatch browserDispatch = browsers.next(); 1720 try { 1721 MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 1722 msgContext.setDestination(destination); 1723 1724 QueueBrowserSubscription browser = browserDispatch.getBrowser(); 1725 1726 LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, messagesInMemory.size()); 1727 boolean added = false; 1728 for (MessageReference node : messagesInMemory) { 1729 if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) { 1730 msgContext.setMessageReference(node); 1731 if (browser.matches(node, msgContext)) { 1732 browser.add(node); 1733 added = true; 1734 } 1735 } 1736 } 1737 // are we done browsing? no new messages paged 1738 if (!added || browser.atMax()) { 1739 browser.decrementQueueRef(); 1740 browserDispatches.remove(browserDispatch); 1741 } else { 1742 wakeup(); 1743 } 1744 } catch (Exception e) { 1745 LOG.warn("exception on dispatch to browser: {}", browserDispatch.getBrowser(), e); 1746 } 1747 } 1748 } 1749 1750 if (pendingWakeups.get() > 0) { 1751 pendingWakeups.decrementAndGet(); 1752 } 1753 MDC.remove("activemq.destination"); 1754 iterationRunning = false; 1755 1756 return pendingWakeups.get() > 0; 1757 } 1758 } 1759 1760 public void pauseDispatch() { 1761 dispatchSelector.pause(); 1762 } 1763 1764 public void resumeDispatch() { 1765 dispatchSelector.resume(); 1766 wakeup(); 1767 } 1768 1769 public boolean isDispatchPaused() { 1770 return dispatchSelector.isPaused(); 1771 } 1772 1773 protected MessageReferenceFilter createMessageIdFilter(final String messageId) { 1774 return new MessageReferenceFilter() { 1775 @Override 1776 public boolean evaluate(ConnectionContext context, MessageReference r) { 1777 return messageId.equals(r.getMessageId().toString()); 1778 } 1779 1780 @Override 1781 public String toString() { 1782 return "MessageIdFilter: " + messageId; 1783 } 1784 }; 1785 } 1786 1787 protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException { 1788 1789 if (selector == null || selector.isEmpty()) { 1790 return new MessageReferenceFilter() { 1791 1792 @Override 1793 public boolean evaluate(ConnectionContext context, MessageReference messageReference) throws JMSException { 1794 return true; 1795 } 1796 }; 1797 } 1798 1799 final BooleanExpression selectorExpression = SelectorParser.parse(selector); 1800 1801 return new MessageReferenceFilter() { 1802 @Override 1803 public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException { 1804 MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext(); 1805 1806 messageEvaluationContext.setMessageReference(r); 1807 if (messageEvaluationContext.getDestination() == null) { 1808 messageEvaluationContext.setDestination(getActiveMQDestination()); 1809 } 1810 1811 return selectorExpression.matches(messageEvaluationContext); 1812 } 1813 }; 1814 } 1815 1816 protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException { 1817 removeMessage(c, null, r); 1818 pagedInPendingDispatchLock.writeLock().lock(); 1819 try { 1820 dispatchPendingList.remove(r); 1821 } finally { 1822 pagedInPendingDispatchLock.writeLock().unlock(); 1823 } 1824 } 1825 1826 protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException { 1827 MessageAck ack = new MessageAck(); 1828 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 1829 ack.setDestination(destination); 1830 ack.setMessageID(r.getMessageId()); 1831 removeMessage(c, subs, r, ack); 1832 } 1833 1834 protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference, 1835 MessageAck ack) throws IOException { 1836 LOG.trace("ack of {} with {}", reference.getMessageId(), ack); 1837 // This sends the ack the the journal.. 1838 if (!ack.isInTransaction()) { 1839 acknowledge(context, sub, ack, reference); 1840 dropMessage(reference); 1841 } else { 1842 try { 1843 acknowledge(context, sub, ack, reference); 1844 } finally { 1845 context.getTransaction().addSynchronization(new Synchronization() { 1846 1847 @Override 1848 public void afterCommit() throws Exception { 1849 dropMessage(reference); 1850 wakeup(); 1851 } 1852 1853 @Override 1854 public void afterRollback() throws Exception { 1855 reference.setAcked(false); 1856 wakeup(); 1857 } 1858 }); 1859 } 1860 } 1861 if (ack.isPoisonAck() || (sub != null && sub.getConsumerInfo().isNetworkSubscription())) { 1862 // message gone to DLQ, is ok to allow redelivery 1863 messagesLock.writeLock().lock(); 1864 try { 1865 messages.rollback(reference.getMessageId()); 1866 } finally { 1867 messagesLock.writeLock().unlock(); 1868 } 1869 if (sub != null && sub.getConsumerInfo().isNetworkSubscription()) { 1870 getDestinationStatistics().getForwards().increment(); 1871 } 1872 } 1873 // after successful store update 1874 reference.setAcked(true); 1875 } 1876 1877 private void dropMessage(QueueMessageReference reference) { 1878 //use dropIfLive so we only process the statistics at most one time 1879 if (reference.dropIfLive()) { 1880 getDestinationStatistics().getDequeues().increment(); 1881 getDestinationStatistics().getMessages().decrement(); 1882 pagedInMessagesLock.writeLock().lock(); 1883 try { 1884 pagedInMessages.remove(reference); 1885 } finally { 1886 pagedInMessagesLock.writeLock().unlock(); 1887 } 1888 } 1889 } 1890 1891 public void messageExpired(ConnectionContext context, MessageReference reference) { 1892 messageExpired(context, null, reference); 1893 } 1894 1895 @Override 1896 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 1897 LOG.debug("message expired: {}", reference); 1898 broker.messageExpired(context, reference, subs); 1899 destinationStatistics.getExpired().increment(); 1900 try { 1901 removeMessage(context, subs, (QueueMessageReference) reference); 1902 messagesLock.writeLock().lock(); 1903 try { 1904 messages.rollback(reference.getMessageId()); 1905 } finally { 1906 messagesLock.writeLock().unlock(); 1907 } 1908 } catch (IOException e) { 1909 LOG.error("Failed to remove expired Message from the store ", e); 1910 } 1911 } 1912 1913 final boolean cursorAdd(final Message msg) throws Exception { 1914 messagesLock.writeLock().lock(); 1915 try { 1916 return messages.addMessageLast(msg); 1917 } finally { 1918 messagesLock.writeLock().unlock(); 1919 } 1920 } 1921 1922 final void messageSent(final ConnectionContext context, final Message msg) throws Exception { 1923 pendingSends.decrementAndGet(); 1924 destinationStatistics.getEnqueues().increment(); 1925 destinationStatistics.getMessages().increment(); 1926 destinationStatistics.getMessageSize().addSize(msg.getSize()); 1927 messageDelivered(context, msg); 1928 consumersLock.readLock().lock(); 1929 try { 1930 if (consumers.isEmpty()) { 1931 onMessageWithNoConsumers(context, msg); 1932 } 1933 }finally { 1934 consumersLock.readLock().unlock(); 1935 } 1936 LOG.debug("{} Message {} sent to {}", new Object[]{ broker.getBrokerName(), msg.getMessageId(), this.destination }); 1937 wakeup(); 1938 } 1939 1940 @Override 1941 public void wakeup() { 1942 if (optimizedDispatch && !iterationRunning) { 1943 iterate(); 1944 pendingWakeups.incrementAndGet(); 1945 } else { 1946 asyncWakeup(); 1947 } 1948 } 1949 1950 private void asyncWakeup() { 1951 try { 1952 pendingWakeups.incrementAndGet(); 1953 this.taskRunner.wakeup(); 1954 } catch (InterruptedException e) { 1955 LOG.warn("Async task runner failed to wakeup ", e); 1956 } 1957 } 1958 1959 private void doPageIn(boolean force) throws Exception { 1960 doPageIn(force, true, getMaxPageSize()); 1961 } 1962 1963 private void doPageIn(boolean force, boolean processExpired, int maxPageSize) throws Exception { 1964 PendingList newlyPaged = doPageInForDispatch(force, processExpired, maxPageSize); 1965 pagedInPendingDispatchLock.writeLock().lock(); 1966 try { 1967 if (dispatchPendingList.isEmpty()) { 1968 dispatchPendingList.addAll(newlyPaged); 1969 1970 } else { 1971 for (MessageReference qmr : newlyPaged) { 1972 if (!dispatchPendingList.contains(qmr)) { 1973 dispatchPendingList.addMessageLast(qmr); 1974 } 1975 } 1976 } 1977 } finally { 1978 pagedInPendingDispatchLock.writeLock().unlock(); 1979 } 1980 } 1981 1982 private PendingList doPageInForDispatch(boolean force, boolean processExpired, int maxPageSize) throws Exception { 1983 List<QueueMessageReference> result = null; 1984 PendingList resultList = null; 1985 1986 int toPageIn = maxPageSize; 1987 messagesLock.readLock().lock(); 1988 try { 1989 toPageIn = Math.min(toPageIn, messages.size()); 1990 } finally { 1991 messagesLock.readLock().unlock(); 1992 } 1993 int pagedInPendingSize = 0; 1994 pagedInPendingDispatchLock.readLock().lock(); 1995 try { 1996 pagedInPendingSize = dispatchPendingList.size(); 1997 } finally { 1998 pagedInPendingDispatchLock.readLock().unlock(); 1999 } 2000 if (isLazyDispatch() && !force) { 2001 // Only page in the minimum number of messages which can be 2002 // dispatched immediately. 2003 toPageIn = Math.min(toPageIn, getConsumerMessageCountBeforeFull()); 2004 } 2005 2006 if (LOG.isDebugEnabled()) { 2007 LOG.debug("{} toPageIn: {}, force:{}, Inflight: {}, pagedInMessages.size {}, pagedInPendingDispatch.size {}, enqueueCount: {}, dequeueCount: {}, memUsage:{}, maxPageSize:{}", 2008 new Object[]{ 2009 this, 2010 toPageIn, 2011 force, 2012 destinationStatistics.getInflight().getCount(), 2013 pagedInMessages.size(), 2014 pagedInPendingSize, 2015 destinationStatistics.getEnqueues().getCount(), 2016 destinationStatistics.getDequeues().getCount(), 2017 getMemoryUsage().getUsage(), 2018 maxPageSize 2019 }); 2020 } 2021 2022 if (toPageIn > 0 && (force || (haveRealConsumer() && pagedInPendingSize < maxPageSize))) { 2023 int count = 0; 2024 result = new ArrayList<QueueMessageReference>(toPageIn); 2025 messagesLock.writeLock().lock(); 2026 try { 2027 try { 2028 messages.setMaxBatchSize(toPageIn); 2029 messages.reset(); 2030 while (count < toPageIn && messages.hasNext()) { 2031 MessageReference node = messages.next(); 2032 messages.remove(); 2033 2034 QueueMessageReference ref = createMessageReference(node.getMessage()); 2035 if (processExpired && ref.isExpired()) { 2036 if (broker.isExpired(ref)) { 2037 messageExpired(createConnectionContext(), ref); 2038 } else { 2039 ref.decrementReferenceCount(); 2040 } 2041 } else { 2042 result.add(ref); 2043 count++; 2044 } 2045 } 2046 } finally { 2047 messages.release(); 2048 } 2049 } finally { 2050 messagesLock.writeLock().unlock(); 2051 } 2052 2053 if (count > 0) { 2054 // Only add new messages, not already pagedIn to avoid multiple 2055 // dispatch attempts 2056 pagedInMessagesLock.writeLock().lock(); 2057 try { 2058 if (isPrioritizedMessages()) { 2059 resultList = new PrioritizedPendingList(); 2060 } else { 2061 resultList = new OrderedPendingList(); 2062 } 2063 for (QueueMessageReference ref : result) { 2064 if (!pagedInMessages.contains(ref)) { 2065 pagedInMessages.addMessageLast(ref); 2066 resultList.addMessageLast(ref); 2067 } else { 2068 ref.decrementReferenceCount(); 2069 // store should have trapped duplicate in it's index, or cursor audit trapped insert 2070 // or producerBrokerExchange suppressed send. 2071 // note: jdbc store will not trap unacked messages as a duplicate b/c it gives each message a unique sequence id 2072 LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong()); 2073 if (store != null) { 2074 ConnectionContext connectionContext = createConnectionContext(); 2075 dropMessage(ref); 2076 if (gotToTheStore(ref.getMessage())) { 2077 LOG.debug("Duplicate message {} from cursor, removing from store", this, ref.getMessage()); 2078 store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1)); 2079 } 2080 broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from cursor for " + destination)); 2081 } 2082 } 2083 } 2084 } finally { 2085 pagedInMessagesLock.writeLock().unlock(); 2086 } 2087 } else if (!messages.hasSpace()) { 2088 if (isFlowControlLogRequired()) { 2089 LOG.warn("{} cursor blocked, no space available to page in messages; usage: {}", this, this.systemUsage.getMemoryUsage()); 2090 } else { 2091 LOG.debug("{} cursor blocked, no space available to page in messages; usage: {}", this, this.systemUsage.getMemoryUsage()); 2092 } 2093 } 2094 } 2095 2096 // Avoid return null list, if condition is not validated 2097 return resultList != null ? resultList : new OrderedPendingList(); 2098 } 2099 2100 private final boolean haveRealConsumer() { 2101 return consumers.size() - browserDispatches.size() > 0; 2102 } 2103 2104 private void doDispatch(PendingList list) throws Exception { 2105 boolean doWakeUp = false; 2106 2107 pagedInPendingDispatchLock.writeLock().lock(); 2108 try { 2109 if (isPrioritizedMessages() && !dispatchPendingList.isEmpty() && list != null && !list.isEmpty()) { 2110 // merge all to select priority order 2111 for (MessageReference qmr : list) { 2112 if (!dispatchPendingList.contains(qmr)) { 2113 dispatchPendingList.addMessageLast(qmr); 2114 } 2115 } 2116 list = null; 2117 } 2118 2119 doActualDispatch(dispatchPendingList); 2120 // and now see if we can dispatch the new stuff.. and append to the pending 2121 // list anything that does not actually get dispatched. 2122 if (list != null && !list.isEmpty()) { 2123 if (dispatchPendingList.isEmpty()) { 2124 dispatchPendingList.addAll(doActualDispatch(list)); 2125 } else { 2126 for (MessageReference qmr : list) { 2127 if (!dispatchPendingList.contains(qmr)) { 2128 dispatchPendingList.addMessageLast(qmr); 2129 } 2130 } 2131 doWakeUp = true; 2132 } 2133 } 2134 } finally { 2135 pagedInPendingDispatchLock.writeLock().unlock(); 2136 } 2137 2138 if (doWakeUp) { 2139 // avoid lock order contention 2140 asyncWakeup(); 2141 } 2142 } 2143 2144 /** 2145 * @return list of messages that could get dispatched to consumers if they 2146 * were not full. 2147 */ 2148 private PendingList doActualDispatch(PendingList list) throws Exception { 2149 List<Subscription> consumers; 2150 consumersLock.readLock().lock(); 2151 2152 try { 2153 if (this.consumers.isEmpty()) { 2154 // slave dispatch happens in processDispatchNotification 2155 return list; 2156 } 2157 consumers = new ArrayList<Subscription>(this.consumers); 2158 } finally { 2159 consumersLock.readLock().unlock(); 2160 } 2161 2162 Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size()); 2163 2164 for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) { 2165 2166 MessageReference node = iterator.next(); 2167 Subscription target = null; 2168 for (Subscription s : consumers) { 2169 if (s instanceof QueueBrowserSubscription) { 2170 continue; 2171 } 2172 if (!fullConsumers.contains(s)) { 2173 if (!s.isFull()) { 2174 if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) { 2175 // Dispatch it. 2176 s.add(node); 2177 LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId()); 2178 iterator.remove(); 2179 target = s; 2180 break; 2181 } 2182 } else { 2183 // no further dispatch of list to a full consumer to 2184 // avoid out of order message receipt 2185 fullConsumers.add(s); 2186 LOG.trace("Subscription full {}", s); 2187 } 2188 } 2189 } 2190 2191 if (target == null && node.isDropped()) { 2192 iterator.remove(); 2193 } 2194 2195 // return if there are no consumers or all consumers are full 2196 if (target == null && consumers.size() == fullConsumers.size()) { 2197 return list; 2198 } 2199 2200 // If it got dispatched, rotate the consumer list to get round robin 2201 // distribution. 2202 if (target != null && !strictOrderDispatch && consumers.size() > 1 2203 && !dispatchSelector.isExclusiveConsumer(target)) { 2204 consumersLock.writeLock().lock(); 2205 try { 2206 if (removeFromConsumerList(target)) { 2207 addToConsumerList(target); 2208 consumers = new ArrayList<Subscription>(this.consumers); 2209 } 2210 } finally { 2211 consumersLock.writeLock().unlock(); 2212 } 2213 } 2214 } 2215 2216 return list; 2217 } 2218 2219 protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception { 2220 boolean result = true; 2221 // Keep message groups together. 2222 String groupId = node.getGroupID(); 2223 int sequence = node.getGroupSequence(); 2224 if (groupId != null) { 2225 2226 MessageGroupMap messageGroupOwners = getMessageGroupOwners(); 2227 // If we can own the first, then no-one else should own the 2228 // rest. 2229 if (sequence == 1) { 2230 assignGroup(subscription, messageGroupOwners, node, groupId); 2231 } else { 2232 2233 // Make sure that the previous owner is still valid, we may 2234 // need to become the new owner. 2235 ConsumerId groupOwner; 2236 2237 groupOwner = messageGroupOwners.get(groupId); 2238 if (groupOwner == null) { 2239 assignGroup(subscription, messageGroupOwners, node, groupId); 2240 } else { 2241 if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) { 2242 // A group sequence < 1 is an end of group signal. 2243 if (sequence < 0) { 2244 messageGroupOwners.removeGroup(groupId); 2245 subscription.getConsumerInfo().decrementAssignedGroupCount(destination); 2246 } 2247 } else { 2248 result = false; 2249 } 2250 } 2251 } 2252 } 2253 2254 return result; 2255 } 2256 2257 protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException { 2258 messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId()); 2259 Message message = n.getMessage(); 2260 message.setJMSXGroupFirstForConsumer(true); 2261 subs.getConsumerInfo().incrementAssignedGroupCount(destination); 2262 } 2263 2264 protected void pageInMessages(boolean force, int maxPageSize) throws Exception { 2265 doDispatch(doPageInForDispatch(force, true, maxPageSize)); 2266 } 2267 2268 private void addToConsumerList(Subscription sub) { 2269 if (useConsumerPriority) { 2270 consumers.add(sub); 2271 Collections.sort(consumers, orderedCompare); 2272 } else { 2273 consumers.add(sub); 2274 } 2275 } 2276 2277 private boolean removeFromConsumerList(Subscription sub) { 2278 return consumers.remove(sub); 2279 } 2280 2281 private int getConsumerMessageCountBeforeFull() throws Exception { 2282 int total = 0; 2283 consumersLock.readLock().lock(); 2284 try { 2285 for (Subscription s : consumers) { 2286 if (s.isBrowser()) { 2287 continue; 2288 } 2289 int countBeforeFull = s.countBeforeFull(); 2290 total += countBeforeFull; 2291 } 2292 } finally { 2293 consumersLock.readLock().unlock(); 2294 } 2295 return total; 2296 } 2297 2298 /* 2299 * In slave mode, dispatch is ignored till we get this notification as the 2300 * dispatch process is non deterministic between master and slave. On a 2301 * notification, the actual dispatch to the subscription (as chosen by the 2302 * master) is completed. (non-Javadoc) 2303 * @see 2304 * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification 2305 * (org.apache.activemq.command.MessageDispatchNotification) 2306 */ 2307 @Override 2308 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 2309 // do dispatch 2310 Subscription sub = getMatchingSubscription(messageDispatchNotification); 2311 if (sub != null) { 2312 MessageReference message = getMatchingMessage(messageDispatchNotification); 2313 sub.add(message); 2314 sub.processMessageDispatchNotification(messageDispatchNotification); 2315 } 2316 } 2317 2318 private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification) 2319 throws Exception { 2320 QueueMessageReference message = null; 2321 MessageId messageId = messageDispatchNotification.getMessageId(); 2322 2323 pagedInPendingDispatchLock.writeLock().lock(); 2324 try { 2325 for (MessageReference ref : dispatchPendingList) { 2326 if (messageId.equals(ref.getMessageId())) { 2327 message = (QueueMessageReference)ref; 2328 dispatchPendingList.remove(ref); 2329 break; 2330 } 2331 } 2332 } finally { 2333 pagedInPendingDispatchLock.writeLock().unlock(); 2334 } 2335 2336 if (message == null) { 2337 pagedInMessagesLock.readLock().lock(); 2338 try { 2339 message = (QueueMessageReference)pagedInMessages.get(messageId); 2340 } finally { 2341 pagedInMessagesLock.readLock().unlock(); 2342 } 2343 } 2344 2345 if (message == null) { 2346 messagesLock.writeLock().lock(); 2347 try { 2348 try { 2349 messages.setMaxBatchSize(getMaxPageSize()); 2350 messages.reset(); 2351 while (messages.hasNext()) { 2352 MessageReference node = messages.next(); 2353 messages.remove(); 2354 if (messageId.equals(node.getMessageId())) { 2355 message = this.createMessageReference(node.getMessage()); 2356 break; 2357 } 2358 } 2359 } finally { 2360 messages.release(); 2361 } 2362 } finally { 2363 messagesLock.writeLock().unlock(); 2364 } 2365 } 2366 2367 if (message == null) { 2368 Message msg = loadMessage(messageId); 2369 if (msg != null) { 2370 message = this.createMessageReference(msg); 2371 } 2372 } 2373 2374 if (message == null) { 2375 throw new JMSException("Slave broker out of sync with master - Message: " 2376 + messageDispatchNotification.getMessageId() + " on " 2377 + messageDispatchNotification.getDestination() + " does not exist among pending(" 2378 + dispatchPendingList.size() + ") for subscription: " 2379 + messageDispatchNotification.getConsumerId()); 2380 } 2381 return message; 2382 } 2383 2384 /** 2385 * Find a consumer that matches the id in the message dispatch notification 2386 * 2387 * @param messageDispatchNotification 2388 * @return sub or null if the subscription has been removed before dispatch 2389 * @throws JMSException 2390 */ 2391 private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification) 2392 throws JMSException { 2393 Subscription sub = null; 2394 consumersLock.readLock().lock(); 2395 try { 2396 for (Subscription s : consumers) { 2397 if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) { 2398 sub = s; 2399 break; 2400 } 2401 } 2402 } finally { 2403 consumersLock.readLock().unlock(); 2404 } 2405 return sub; 2406 } 2407 2408 @Override 2409 public void onUsageChanged(@SuppressWarnings("rawtypes") Usage usage, int oldPercentUsage, int newPercentUsage) { 2410 if (oldPercentUsage > newPercentUsage) { 2411 asyncWakeup(); 2412 } 2413 } 2414 2415 @Override 2416 protected Logger getLog() { 2417 return LOG; 2418 } 2419 2420 protected boolean isOptimizeStorage(){ 2421 boolean result = false; 2422 if (isDoOptimzeMessageStorage()){ 2423 consumersLock.readLock().lock(); 2424 try{ 2425 if (consumers.isEmpty()==false){ 2426 result = true; 2427 for (Subscription s : consumers) { 2428 if (s.getPrefetchSize()==0){ 2429 result = false; 2430 break; 2431 } 2432 if (s.isSlowConsumer()){ 2433 result = false; 2434 break; 2435 } 2436 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){ 2437 result = false; 2438 break; 2439 } 2440 } 2441 } 2442 } finally { 2443 consumersLock.readLock().unlock(); 2444 } 2445 } 2446 return result; 2447 } 2448}