001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import static org.apache.activemq.transaction.Transaction.IN_USE_STATE; 020import static org.apache.activemq.broker.region.cursors.AbstractStoreCursor.gotToTheStore; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.Comparator; 026import java.util.HashSet; 027import java.util.Iterator; 028import java.util.LinkedHashMap; 029import java.util.LinkedHashSet; 030import java.util.LinkedList; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.concurrent.CancellationException; 035import java.util.concurrent.ConcurrentLinkedQueue; 036import java.util.concurrent.CountDownLatch; 037import java.util.concurrent.DelayQueue; 038import java.util.concurrent.Delayed; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicInteger; 042import java.util.concurrent.atomic.AtomicLong; 043import java.util.concurrent.locks.Lock; 044import java.util.concurrent.locks.ReentrantLock; 045import java.util.concurrent.locks.ReentrantReadWriteLock; 046 047import javax.jms.InvalidSelectorException; 048import javax.jms.JMSException; 049import javax.jms.ResourceAllocationException; 050 051import org.apache.activemq.broker.BrokerService; 052import org.apache.activemq.broker.ConnectionContext; 053import org.apache.activemq.broker.ProducerBrokerExchange; 054import org.apache.activemq.broker.region.cursors.OrderedPendingList; 055import org.apache.activemq.broker.region.cursors.PendingList; 056import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 057import org.apache.activemq.broker.region.cursors.PrioritizedPendingList; 058import org.apache.activemq.broker.region.cursors.QueueDispatchPendingList; 059import org.apache.activemq.broker.region.cursors.StoreQueueCursor; 060import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 061import org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory; 062import org.apache.activemq.broker.region.group.MessageGroupMap; 063import org.apache.activemq.broker.region.group.MessageGroupMapFactory; 064import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 065import org.apache.activemq.broker.region.policy.DispatchPolicy; 066import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; 067import org.apache.activemq.broker.util.InsertionCountList; 068import org.apache.activemq.command.ActiveMQDestination; 069import org.apache.activemq.command.ActiveMQMessage; 070import org.apache.activemq.command.ConsumerId; 071import org.apache.activemq.command.ExceptionResponse; 072import org.apache.activemq.command.Message; 073import org.apache.activemq.command.MessageAck; 074import org.apache.activemq.command.MessageDispatchNotification; 075import org.apache.activemq.command.MessageId; 076import org.apache.activemq.command.ProducerAck; 077import org.apache.activemq.command.ProducerInfo; 078import org.apache.activemq.command.RemoveInfo; 079import org.apache.activemq.command.Response; 080import org.apache.activemq.filter.BooleanExpression; 081import org.apache.activemq.filter.MessageEvaluationContext; 082import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 083import org.apache.activemq.selector.SelectorParser; 084import org.apache.activemq.state.ProducerState; 085import org.apache.activemq.store.IndexListener; 086import org.apache.activemq.store.ListenableFuture; 087import org.apache.activemq.store.MessageRecoveryListener; 088import org.apache.activemq.store.MessageStore; 089import org.apache.activemq.thread.Task; 090import org.apache.activemq.thread.TaskRunner; 091import org.apache.activemq.thread.TaskRunnerFactory; 092import org.apache.activemq.transaction.Synchronization; 093import org.apache.activemq.usage.Usage; 094import org.apache.activemq.usage.UsageListener; 095import org.apache.activemq.util.BrokerSupport; 096import org.apache.activemq.util.ThreadPoolUtils; 097import org.slf4j.Logger; 098import org.slf4j.LoggerFactory; 099import org.slf4j.MDC; 100 101/** 102 * The Queue is a List of MessageEntry objects that are dispatched to matching 103 * subscriptions. 104 */ 105public class Queue extends BaseDestination implements Task, UsageListener, IndexListener { 106 protected static final Logger LOG = LoggerFactory.getLogger(Queue.class); 107 protected final TaskRunnerFactory taskFactory; 108 protected TaskRunner taskRunner; 109 private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock(); 110 protected final List<Subscription> consumers = new ArrayList<Subscription>(50); 111 private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock(); 112 protected PendingMessageCursor messages; 113 private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock(); 114 private final PendingList pagedInMessages = new OrderedPendingList(); 115 // Messages that are paged in but have not yet been targeted at a subscription 116 private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock(); 117 protected QueueDispatchPendingList dispatchPendingList = new QueueDispatchPendingList(); 118 private AtomicInteger pendingSends = new AtomicInteger(0); 119 private MessageGroupMap messageGroupOwners; 120 private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); 121 private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory(); 122 final Lock sendLock = new ReentrantLock(); 123 private ExecutorService executor; 124 private final Map<MessageId, Runnable> messagesWaitingForSpace = new LinkedHashMap<MessageId, Runnable>(); 125 private boolean useConsumerPriority = true; 126 private boolean strictOrderDispatch = false; 127 private final QueueDispatchSelector dispatchSelector; 128 private boolean optimizedDispatch = false; 129 private boolean iterationRunning = false; 130 private boolean firstConsumer = false; 131 private int timeBeforeDispatchStarts = 0; 132 private int consumersBeforeDispatchStarts = 0; 133 private CountDownLatch consumersBeforeStartsLatch; 134 private final AtomicLong pendingWakeups = new AtomicLong(); 135 private boolean allConsumersExclusiveByDefault = false; 136 137 private boolean resetNeeded; 138 139 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 140 @Override 141 public void run() { 142 asyncWakeup(); 143 } 144 }; 145 private final Runnable expireMessagesTask = new Runnable() { 146 @Override 147 public void run() { 148 expireMessages(); 149 } 150 }; 151 152 private final Object iteratingMutex = new Object(); 153 154 // gate on enabling cursor cache to ensure no outstanding sync 155 // send before async sends resume 156 public boolean singlePendingSend() { 157 return pendingSends.get() <= 1; 158 } 159 160 class TimeoutMessage implements Delayed { 161 162 Message message; 163 ConnectionContext context; 164 long trigger; 165 166 public TimeoutMessage(Message message, ConnectionContext context, long delay) { 167 this.message = message; 168 this.context = context; 169 this.trigger = System.currentTimeMillis() + delay; 170 } 171 172 @Override 173 public long getDelay(TimeUnit unit) { 174 long n = trigger - System.currentTimeMillis(); 175 return unit.convert(n, TimeUnit.MILLISECONDS); 176 } 177 178 @Override 179 public int compareTo(Delayed delayed) { 180 long other = ((TimeoutMessage) delayed).trigger; 181 int returnValue; 182 if (this.trigger < other) { 183 returnValue = -1; 184 } else if (this.trigger > other) { 185 returnValue = 1; 186 } else { 187 returnValue = 0; 188 } 189 return returnValue; 190 } 191 } 192 193 DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>(); 194 195 class FlowControlTimeoutTask extends Thread { 196 197 @Override 198 public void run() { 199 TimeoutMessage timeout; 200 try { 201 while (true) { 202 timeout = flowControlTimeoutMessages.take(); 203 if (timeout != null) { 204 synchronized (messagesWaitingForSpace) { 205 if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) { 206 ExceptionResponse response = new ExceptionResponse( 207 new ResourceAllocationException( 208 "Usage Manager Memory Limit Wait Timeout. Stopping producer (" 209 + timeout.message.getProducerId() 210 + ") to prevent flooding " 211 + getActiveMQDestination().getQualifiedName() 212 + "." 213 + " See http://activemq.apache.org/producer-flow-control.html for more info")); 214 response.setCorrelationId(timeout.message.getCommandId()); 215 timeout.context.getConnection().dispatchAsync(response); 216 } 217 } 218 } 219 } 220 } catch (InterruptedException e) { 221 LOG.debug(getName() + "Producer Flow Control Timeout Task is stopping"); 222 } 223 } 224 } 225 226 private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask(); 227 228 private final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() { 229 230 @Override 231 public int compare(Subscription s1, Subscription s2) { 232 // We want the list sorted in descending order 233 int val = s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority(); 234 if (val == 0 && messageGroupOwners != null) { 235 // then ascending order of assigned message groups to favour less loaded consumers 236 // Long.compare in jdk7 237 long x = s1.getConsumerInfo().getAssignedGroupCount(destination); 238 long y = s2.getConsumerInfo().getAssignedGroupCount(destination); 239 val = (x < y) ? -1 : ((x == y) ? 0 : 1); 240 } 241 return val; 242 } 243 }; 244 245 public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store, 246 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 247 super(brokerService, store, destination, parentStats); 248 this.taskFactory = taskFactory; 249 this.dispatchSelector = new QueueDispatchSelector(destination); 250 if (store != null) { 251 store.registerIndexListener(this); 252 } 253 } 254 255 @Override 256 public List<Subscription> getConsumers() { 257 consumersLock.readLock().lock(); 258 try { 259 return new ArrayList<Subscription>(consumers); 260 } finally { 261 consumersLock.readLock().unlock(); 262 } 263 } 264 265 // make the queue easily visible in the debugger from its task runner 266 // threads 267 final class QueueThread extends Thread { 268 final Queue queue; 269 270 public QueueThread(Runnable runnable, String name, Queue queue) { 271 super(runnable, name); 272 this.queue = queue; 273 } 274 } 275 276 class BatchMessageRecoveryListener implements MessageRecoveryListener { 277 final LinkedList<Message> toExpire = new LinkedList<Message>(); 278 final double totalMessageCount; 279 int recoveredAccumulator = 0; 280 int currentBatchCount; 281 282 BatchMessageRecoveryListener(int totalMessageCount) { 283 this.totalMessageCount = totalMessageCount; 284 currentBatchCount = recoveredAccumulator; 285 } 286 287 @Override 288 public boolean recoverMessage(Message message) { 289 recoveredAccumulator++; 290 if ((recoveredAccumulator % 10000) == 0) { 291 LOG.info("cursor for {} has recovered {} messages. {}% complete", new Object[]{ getActiveMQDestination().getQualifiedName(), recoveredAccumulator, new Integer((int) (recoveredAccumulator * 100 / totalMessageCount))}); 292 } 293 // Message could have expired while it was being 294 // loaded.. 295 message.setRegionDestination(Queue.this); 296 if (message.isExpired() && broker.isExpired(message)) { 297 toExpire.add(message); 298 return true; 299 } 300 if (hasSpace()) { 301 messagesLock.writeLock().lock(); 302 try { 303 try { 304 messages.addMessageLast(message); 305 } catch (Exception e) { 306 LOG.error("Failed to add message to cursor", e); 307 } 308 } finally { 309 messagesLock.writeLock().unlock(); 310 } 311 destinationStatistics.getMessages().increment(); 312 return true; 313 } 314 return false; 315 } 316 317 @Override 318 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 319 throw new RuntimeException("Should not be called."); 320 } 321 322 @Override 323 public boolean hasSpace() { 324 return true; 325 } 326 327 @Override 328 public boolean isDuplicate(MessageId id) { 329 return false; 330 } 331 332 public void reset() { 333 currentBatchCount = recoveredAccumulator; 334 } 335 336 public void processExpired() { 337 for (Message message: toExpire) { 338 messageExpired(createConnectionContext(), createMessageReference(message)); 339 // drop message will decrement so counter 340 // balance here 341 destinationStatistics.getMessages().increment(); 342 } 343 toExpire.clear(); 344 } 345 346 public boolean done() { 347 return currentBatchCount == recoveredAccumulator; 348 } 349 } 350 351 @Override 352 public void setPrioritizedMessages(boolean prioritizedMessages) { 353 super.setPrioritizedMessages(prioritizedMessages); 354 dispatchPendingList.setPrioritizedMessages(prioritizedMessages); 355 } 356 357 @Override 358 public void initialize() throws Exception { 359 360 if (this.messages == null) { 361 if (destination.isTemporary() || broker == null || store == null) { 362 this.messages = new VMPendingMessageCursor(isPrioritizedMessages()); 363 } else { 364 this.messages = new StoreQueueCursor(broker, this); 365 } 366 } 367 368 // If a VMPendingMessageCursor don't use the default Producer System 369 // Usage 370 // since it turns into a shared blocking queue which can lead to a 371 // network deadlock. 372 // If we are cursoring to disk..it's not and issue because it does not 373 // block due 374 // to large disk sizes. 375 if (messages instanceof VMPendingMessageCursor) { 376 this.systemUsage = brokerService.getSystemUsage(); 377 memoryUsage.setParent(systemUsage.getMemoryUsage()); 378 } 379 380 this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName()); 381 382 super.initialize(); 383 if (store != null) { 384 // Restore the persistent messages. 385 messages.setSystemUsage(systemUsage); 386 messages.setEnableAudit(isEnableAudit()); 387 messages.setMaxAuditDepth(getMaxAuditDepth()); 388 messages.setMaxProducersToAudit(getMaxProducersToAudit()); 389 messages.setUseCache(isUseCache()); 390 messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 391 final int messageCount = store.getMessageCount(); 392 if (messageCount > 0 && messages.isRecoveryRequired()) { 393 BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount); 394 do { 395 listener.reset(); 396 store.recoverNextMessages(getMaxPageSize(), listener); 397 listener.processExpired(); 398 } while (!listener.done()); 399 } else { 400 destinationStatistics.getMessages().add(messageCount); 401 } 402 } 403 } 404 405 /* 406 * Holder for subscription that needs attention on next iterate browser 407 * needs access to existing messages in the queue that have already been 408 * dispatched 409 */ 410 class BrowserDispatch { 411 QueueBrowserSubscription browser; 412 413 public BrowserDispatch(QueueBrowserSubscription browserSubscription) { 414 browser = browserSubscription; 415 browser.incrementQueueRef(); 416 } 417 418 public QueueBrowserSubscription getBrowser() { 419 return browser; 420 } 421 } 422 423 ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>(); 424 425 @Override 426 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { 427 LOG.debug("{} add sub: {}, dequeues: {}, dispatched: {}, inflight: {}", new Object[]{ getActiveMQDestination().getQualifiedName(), sub, getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getInflight().getCount() }); 428 429 super.addSubscription(context, sub); 430 // synchronize with dispatch method so that no new messages are sent 431 // while setting up a subscription. avoid out of order messages, 432 // duplicates, etc. 433 pagedInPendingDispatchLock.writeLock().lock(); 434 try { 435 436 sub.add(context, this); 437 438 // needs to be synchronized - so no contention with dispatching 439 // consumersLock. 440 consumersLock.writeLock().lock(); 441 try { 442 // set a flag if this is a first consumer 443 if (consumers.size() == 0) { 444 firstConsumer = true; 445 if (consumersBeforeDispatchStarts != 0) { 446 consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1); 447 } 448 } else { 449 if (consumersBeforeStartsLatch != null) { 450 consumersBeforeStartsLatch.countDown(); 451 } 452 } 453 454 addToConsumerList(sub); 455 if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) { 456 Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer(); 457 if (exclusiveConsumer == null) { 458 exclusiveConsumer = sub; 459 } else if (sub.getConsumerInfo().getPriority() == Byte.MAX_VALUE || 460 sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) { 461 exclusiveConsumer = sub; 462 } 463 dispatchSelector.setExclusiveConsumer(exclusiveConsumer); 464 } 465 } finally { 466 consumersLock.writeLock().unlock(); 467 } 468 469 if (sub instanceof QueueBrowserSubscription) { 470 // tee up for dispatch in next iterate 471 QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub; 472 BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription); 473 browserDispatches.add(browserDispatch); 474 } 475 476 if (!this.optimizedDispatch) { 477 wakeup(); 478 } 479 } finally { 480 pagedInPendingDispatchLock.writeLock().unlock(); 481 } 482 if (this.optimizedDispatch) { 483 // Outside of dispatchLock() to maintain the lock hierarchy of 484 // iteratingMutex -> dispatchLock. - see 485 // https://issues.apache.org/activemq/browse/AMQ-1878 486 wakeup(); 487 } 488 } 489 490 @Override 491 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) 492 throws Exception { 493 super.removeSubscription(context, sub, lastDeliveredSequenceId); 494 // synchronize with dispatch method so that no new messages are sent 495 // while removing up a subscription. 496 pagedInPendingDispatchLock.writeLock().lock(); 497 try { 498 LOG.debug("{} remove sub: {}, lastDeliveredSeqId: {}, dequeues: {}, dispatched: {}, inflight: {}, groups: {}", new Object[]{ 499 getActiveMQDestination().getQualifiedName(), 500 sub, 501 lastDeliveredSequenceId, 502 getDestinationStatistics().getDequeues().getCount(), 503 getDestinationStatistics().getDispatched().getCount(), 504 getDestinationStatistics().getInflight().getCount(), 505 sub.getConsumerInfo().getAssignedGroupCount(destination) 506 }); 507 consumersLock.writeLock().lock(); 508 try { 509 removeFromConsumerList(sub); 510 if (sub.getConsumerInfo().isExclusive()) { 511 Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer(); 512 if (exclusiveConsumer == sub) { 513 exclusiveConsumer = null; 514 for (Subscription s : consumers) { 515 if (s.getConsumerInfo().isExclusive() 516 && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer 517 .getConsumerInfo().getPriority())) { 518 exclusiveConsumer = s; 519 520 } 521 } 522 dispatchSelector.setExclusiveConsumer(exclusiveConsumer); 523 } 524 } else if (isAllConsumersExclusiveByDefault()) { 525 Subscription exclusiveConsumer = null; 526 for (Subscription s : consumers) { 527 if (exclusiveConsumer == null 528 || s.getConsumerInfo().getPriority() > exclusiveConsumer 529 .getConsumerInfo().getPriority()) { 530 exclusiveConsumer = s; 531 } 532 } 533 dispatchSelector.setExclusiveConsumer(exclusiveConsumer); 534 } 535 ConsumerId consumerId = sub.getConsumerInfo().getConsumerId(); 536 getMessageGroupOwners().removeConsumer(consumerId); 537 538 // redeliver inflight messages 539 540 boolean markAsRedelivered = false; 541 MessageReference lastDeliveredRef = null; 542 List<MessageReference> unAckedMessages = sub.remove(context, this); 543 544 // locate last redelivered in unconsumed list (list in delivery rather than seq order) 545 if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) { 546 for (MessageReference ref : unAckedMessages) { 547 if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) { 548 lastDeliveredRef = ref; 549 markAsRedelivered = true; 550 LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeliveredSequenceId, ref.getMessageId()); 551 break; 552 } 553 } 554 } 555 556 for (Iterator<MessageReference> unackedListIterator = unAckedMessages.iterator(); unackedListIterator.hasNext(); ) { 557 MessageReference ref = unackedListIterator.next(); 558 // AMQ-5107: don't resend if the broker is shutting down 559 if ( this.brokerService.isStopping() ) { 560 break; 561 } 562 QueueMessageReference qmr = (QueueMessageReference) ref; 563 if (qmr.getLockOwner() == sub) { 564 qmr.unlock(); 565 566 // have no delivery information 567 if (lastDeliveredSequenceId == RemoveInfo.LAST_DELIVERED_UNKNOWN) { 568 qmr.incrementRedeliveryCounter(); 569 } else { 570 if (markAsRedelivered) { 571 qmr.incrementRedeliveryCounter(); 572 } 573 if (ref == lastDeliveredRef) { 574 // all that follow were not redelivered 575 markAsRedelivered = false; 576 } 577 } 578 } 579 if (qmr.isDropped()) { 580 unackedListIterator.remove(); 581 } 582 } 583 dispatchPendingList.addForRedelivery(unAckedMessages, strictOrderDispatch && consumers.isEmpty()); 584 if (sub instanceof QueueBrowserSubscription) { 585 ((QueueBrowserSubscription)sub).decrementQueueRef(); 586 browserDispatches.remove(sub); 587 } 588 // AMQ-5107: don't resend if the broker is shutting down 589 if (dispatchPendingList.hasRedeliveries() && (! this.brokerService.isStopping())) { 590 doDispatch(new OrderedPendingList()); 591 } 592 } finally { 593 consumersLock.writeLock().unlock(); 594 } 595 if (!this.optimizedDispatch) { 596 wakeup(); 597 } 598 } finally { 599 pagedInPendingDispatchLock.writeLock().unlock(); 600 } 601 if (this.optimizedDispatch) { 602 // Outside of dispatchLock() to maintain the lock hierarchy of 603 // iteratingMutex -> dispatchLock. - see 604 // https://issues.apache.org/activemq/browse/AMQ-1878 605 wakeup(); 606 } 607 } 608 609 private volatile ResourceAllocationException sendMemAllocationException = null; 610 @Override 611 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 612 final ConnectionContext context = producerExchange.getConnectionContext(); 613 // There is delay between the client sending it and it arriving at the 614 // destination.. it may have expired. 615 message.setRegionDestination(this); 616 ProducerState state = producerExchange.getProducerState(); 617 if (state == null) { 618 LOG.warn("Send failed for: {}, missing producer state for: {}", message, producerExchange); 619 throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state"); 620 } 621 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 622 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 623 && !context.isInRecoveryMode(); 624 if (message.isExpired()) { 625 // message not stored - or added to stats yet - so chuck here 626 broker.getRoot().messageExpired(context, message, null); 627 if (sendProducerAck) { 628 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 629 context.getConnection().dispatchAsync(ack); 630 } 631 return; 632 } 633 if (memoryUsage.isFull()) { 634 isFull(context, memoryUsage); 635 fastProducer(context, producerInfo); 636 if (isProducerFlowControl() && context.isProducerFlowControl()) { 637 if (isFlowControlLogRequired()) { 638 LOG.warn("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 639 memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount()); 640 } else { 641 LOG.debug("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 642 memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount()); 643 } 644 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 645 ResourceAllocationException resourceAllocationException = sendMemAllocationException; 646 if (resourceAllocationException == null) { 647 synchronized (this) { 648 resourceAllocationException = sendMemAllocationException; 649 if (resourceAllocationException == null) { 650 sendMemAllocationException = resourceAllocationException = new ResourceAllocationException("Usage Manager Memory Limit reached on " 651 + getActiveMQDestination().getQualifiedName() + "." 652 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 653 } 654 } 655 } 656 throw resourceAllocationException; 657 } 658 659 // We can avoid blocking due to low usage if the producer is 660 // sending 661 // a sync message or if it is using a producer window 662 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 663 // copy the exchange state since the context will be 664 // modified while we are waiting 665 // for space. 666 final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy(); 667 synchronized (messagesWaitingForSpace) { 668 // Start flow control timeout task 669 // Prevent trying to start it multiple times 670 if (!flowControlTimeoutTask.isAlive()) { 671 flowControlTimeoutTask.setName(getName()+" Producer Flow Control Timeout Task"); 672 flowControlTimeoutTask.start(); 673 } 674 messagesWaitingForSpace.put(message.getMessageId(), new Runnable() { 675 @Override 676 public void run() { 677 678 try { 679 // While waiting for space to free up... the 680 // transaction may be done 681 if (message.isInTransaction()) { 682 if (context.getTransaction().getState() > IN_USE_STATE) { 683 throw new JMSException("Send transaction completed while waiting for space"); 684 } 685 } 686 687 // the message may have expired. 688 if (message.isExpired()) { 689 LOG.error("message expired waiting for space"); 690 broker.messageExpired(context, message, null); 691 destinationStatistics.getExpired().increment(); 692 } else { 693 doMessageSend(producerExchangeCopy, message); 694 } 695 696 if (sendProducerAck) { 697 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 698 .getSize()); 699 context.getConnection().dispatchAsync(ack); 700 } else { 701 Response response = new Response(); 702 response.setCorrelationId(message.getCommandId()); 703 context.getConnection().dispatchAsync(response); 704 } 705 706 } catch (Exception e) { 707 if (!sendProducerAck && !context.isInRecoveryMode() && !brokerService.isStopping()) { 708 ExceptionResponse response = new ExceptionResponse(e); 709 response.setCorrelationId(message.getCommandId()); 710 context.getConnection().dispatchAsync(response); 711 } else { 712 LOG.debug("unexpected exception on deferred send of: {}", message, e); 713 } 714 } finally { 715 getDestinationStatistics().getBlockedSends().decrement(); 716 producerExchangeCopy.blockingOnFlowControl(false); 717 } 718 } 719 }); 720 721 getDestinationStatistics().getBlockedSends().increment(); 722 producerExchange.blockingOnFlowControl(true); 723 if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { 724 flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage 725 .getSendFailIfNoSpaceAfterTimeout())); 726 } 727 728 registerCallbackForNotFullNotification(); 729 context.setDontSendReponse(true); 730 return; 731 } 732 733 } else { 734 735 if (memoryUsage.isFull()) { 736 waitForSpace(context, producerExchange, memoryUsage, "Usage Manager Memory Limit reached. Producer (" 737 + message.getProducerId() + ") stopped to prevent flooding " 738 + getActiveMQDestination().getQualifiedName() + "." 739 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 740 } 741 742 // The usage manager could have delayed us by the time 743 // we unblock the message could have expired.. 744 if (message.isExpired()) { 745 LOG.debug("Expired message: {}", message); 746 broker.getRoot().messageExpired(context, message, null); 747 return; 748 } 749 } 750 } 751 } 752 doMessageSend(producerExchange, message); 753 if (sendProducerAck) { 754 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 755 context.getConnection().dispatchAsync(ack); 756 } 757 } 758 759 private void registerCallbackForNotFullNotification() { 760 // If the usage manager is not full, then the task will not 761 // get called.. 762 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 763 // so call it directly here. 764 sendMessagesWaitingForSpaceTask.run(); 765 } 766 } 767 768 private final LinkedList<MessageContext> indexOrderedCursorUpdates = new LinkedList<>(); 769 770 @Override 771 public void onAdd(MessageContext messageContext) { 772 synchronized (indexOrderedCursorUpdates) { 773 indexOrderedCursorUpdates.addLast(messageContext); 774 } 775 } 776 777 public void rollbackPendingCursorAdditions(MessageId messageId) { 778 synchronized (indexOrderedCursorUpdates) { 779 for (int i = indexOrderedCursorUpdates.size() - 1; i >= 0; i--) { 780 MessageContext mc = indexOrderedCursorUpdates.get(i); 781 if (mc.message.getMessageId().equals(messageId)) { 782 indexOrderedCursorUpdates.remove(mc); 783 if (mc.onCompletion != null) { 784 mc.onCompletion.run(); 785 } 786 break; 787 } 788 } 789 } 790 } 791 792 private void doPendingCursorAdditions() throws Exception { 793 LinkedList<MessageContext> orderedUpdates = new LinkedList<>(); 794 sendLock.lockInterruptibly(); 795 try { 796 synchronized (indexOrderedCursorUpdates) { 797 MessageContext candidate = indexOrderedCursorUpdates.peek(); 798 while (candidate != null && candidate.message.getMessageId().getFutureOrSequenceLong() != null) { 799 candidate = indexOrderedCursorUpdates.removeFirst(); 800 // check for duplicate adds suppressed by the store 801 if (candidate.message.getMessageId().getFutureOrSequenceLong() instanceof Long && ((Long)candidate.message.getMessageId().getFutureOrSequenceLong()).compareTo(-1l) == 0) { 802 LOG.warn("{} messageStore indicated duplicate add attempt for {}, suppressing duplicate dispatch", this, candidate.message.getMessageId()); 803 } else { 804 orderedUpdates.add(candidate); 805 } 806 candidate = indexOrderedCursorUpdates.peek(); 807 } 808 } 809 messagesLock.writeLock().lock(); 810 try { 811 for (MessageContext messageContext : orderedUpdates) { 812 if (!messages.addMessageLast(messageContext.message)) { 813 // cursor suppressed a duplicate 814 messageContext.duplicate = true; 815 } 816 if (messageContext.onCompletion != null) { 817 messageContext.onCompletion.run(); 818 } 819 } 820 } finally { 821 messagesLock.writeLock().unlock(); 822 } 823 } finally { 824 sendLock.unlock(); 825 } 826 for (MessageContext messageContext : orderedUpdates) { 827 if (!messageContext.duplicate) { 828 messageSent(messageContext.context, messageContext.message); 829 } 830 } 831 orderedUpdates.clear(); 832 } 833 834 final class CursorAddSync extends Synchronization { 835 836 private final MessageContext messageContext; 837 838 CursorAddSync(MessageContext messageContext) { 839 this.messageContext = messageContext; 840 this.messageContext.message.incrementReferenceCount(); 841 } 842 843 @Override 844 public void afterCommit() throws Exception { 845 if (store != null && messageContext.message.isPersistent()) { 846 doPendingCursorAdditions(); 847 } else { 848 cursorAdd(messageContext.message); 849 messageSent(messageContext.context, messageContext.message); 850 } 851 messageContext.message.decrementReferenceCount(); 852 } 853 854 @Override 855 public void afterRollback() throws Exception { 856 if (store != null && messageContext.message.isPersistent()) { 857 rollbackPendingCursorAdditions(messageContext.message.getMessageId()); 858 } 859 messageContext.message.decrementReferenceCount(); 860 } 861 } 862 863 void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, 864 Exception { 865 final ConnectionContext context = producerExchange.getConnectionContext(); 866 ListenableFuture<Object> result = null; 867 868 producerExchange.incrementSend(); 869 pendingSends.incrementAndGet(); 870 checkUsage(context, producerExchange, message); 871 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 872 if (store != null && message.isPersistent()) { 873 message.getMessageId().setFutureOrSequenceLong(null); 874 try { 875 //AMQ-6133 - don't store async if using persistJMSRedelivered 876 //This flag causes a sync update later on dispatch which can cause a race 877 //condition if the original add is processed after the update, which can cause 878 //a duplicate message to be stored 879 if (messages.isCacheEnabled() && !isPersistJMSRedelivered()) { 880 result = store.asyncAddQueueMessage(context, message, isOptimizeStorage()); 881 result.addListener(new PendingMarshalUsageTracker(message)); 882 } else { 883 store.addMessage(context, message); 884 } 885 if (isReduceMemoryFootprint()) { 886 message.clearMarshalledState(); 887 } 888 } catch (Exception e) { 889 // we may have a store in inconsistent state, so reset the cursor 890 // before restarting normal broker operations 891 resetNeeded = true; 892 pendingSends.decrementAndGet(); 893 rollbackPendingCursorAdditions(message.getMessageId()); 894 throw e; 895 } 896 } 897 orderedCursorAdd(message, context); 898 if (result != null && message.isResponseRequired() && !result.isCancelled()) { 899 try { 900 result.get(); 901 } catch (CancellationException e) { 902 // ignore - the task has been cancelled if the message 903 // has already been deleted 904 } 905 } 906 } 907 908 private void orderedCursorAdd(Message message, ConnectionContext context) throws Exception { 909 if (context.isInTransaction()) { 910 context.getTransaction().addSynchronization(new CursorAddSync(new MessageContext(context, message, null))); 911 } else if (store != null && message.isPersistent()) { 912 doPendingCursorAdditions(); 913 } else { 914 // no ordering issue with non persistent messages 915 cursorAdd(message); 916 messageSent(context, message); 917 } 918 } 919 920 private void checkUsage(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Message message) throws ResourceAllocationException, IOException, InterruptedException { 921 if (message.isPersistent()) { 922 if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 923 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of " 924 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" 925 + message.getProducerId() + ") to prevent flooding " 926 + getActiveMQDestination().getQualifiedName() + "." 927 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 928 929 waitForSpace(context, producerBrokerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 930 } 931 } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) { 932 final String logMessage = "Temp Store is Full (" 933 + systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit() 934 +"). Stopping producer (" + message.getProducerId() 935 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 936 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 937 938 waitForSpace(context, producerBrokerExchange, messages.getSystemUsage().getTempUsage(), logMessage); 939 } 940 } 941 942 private void expireMessages() { 943 LOG.debug("{} expiring messages ..", getActiveMQDestination().getQualifiedName()); 944 945 // just track the insertion count 946 List<Message> browsedMessages = new InsertionCountList<Message>(); 947 doBrowse(browsedMessages, this.getMaxExpirePageSize()); 948 asyncWakeup(); 949 LOG.debug("{} expiring messages done.", getActiveMQDestination().getQualifiedName()); 950 } 951 952 @Override 953 public void gc() { 954 } 955 956 @Override 957 public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) 958 throws IOException { 959 messageConsumed(context, node); 960 if (store != null && node.isPersistent()) { 961 store.removeAsyncMessage(context, convertToNonRangedAck(ack, node)); 962 } 963 } 964 965 Message loadMessage(MessageId messageId) throws IOException { 966 Message msg = null; 967 if (store != null) { // can be null for a temp q 968 msg = store.getMessage(messageId); 969 if (msg != null) { 970 msg.setRegionDestination(this); 971 } 972 } 973 return msg; 974 } 975 976 @Override 977 public String toString() { 978 return destination.getQualifiedName() + ", subscriptions=" + consumers.size() 979 + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + destinationStatistics.getMessages().getCount() + ", pending=" 980 + indexOrderedCursorUpdates.size(); 981 } 982 983 @Override 984 public void start() throws Exception { 985 if (started.compareAndSet(false, true)) { 986 if (memoryUsage != null) { 987 memoryUsage.start(); 988 } 989 if (systemUsage.getStoreUsage() != null) { 990 systemUsage.getStoreUsage().start(); 991 } 992 systemUsage.getMemoryUsage().addUsageListener(this); 993 messages.start(); 994 if (getExpireMessagesPeriod() > 0) { 995 scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); 996 } 997 doPageIn(false); 998 } 999 } 1000 1001 @Override 1002 public void stop() throws Exception { 1003 if (started.compareAndSet(true, false)) { 1004 if (taskRunner != null) { 1005 taskRunner.shutdown(); 1006 } 1007 if (this.executor != null) { 1008 ThreadPoolUtils.shutdownNow(executor); 1009 executor = null; 1010 } 1011 1012 scheduler.cancel(expireMessagesTask); 1013 1014 if (flowControlTimeoutTask.isAlive()) { 1015 flowControlTimeoutTask.interrupt(); 1016 } 1017 1018 if (messages != null) { 1019 messages.stop(); 1020 } 1021 1022 for (MessageReference messageReference : pagedInMessages.values()) { 1023 messageReference.decrementReferenceCount(); 1024 } 1025 pagedInMessages.clear(); 1026 1027 systemUsage.getMemoryUsage().removeUsageListener(this); 1028 if (memoryUsage != null) { 1029 memoryUsage.stop(); 1030 } 1031 if (store != null) { 1032 store.stop(); 1033 } 1034 } 1035 } 1036 1037 // Properties 1038 // ------------------------------------------------------------------------- 1039 @Override 1040 public ActiveMQDestination getActiveMQDestination() { 1041 return destination; 1042 } 1043 1044 public MessageGroupMap getMessageGroupOwners() { 1045 if (messageGroupOwners == null) { 1046 messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap(); 1047 messageGroupOwners.setDestination(this); 1048 } 1049 return messageGroupOwners; 1050 } 1051 1052 public DispatchPolicy getDispatchPolicy() { 1053 return dispatchPolicy; 1054 } 1055 1056 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 1057 this.dispatchPolicy = dispatchPolicy; 1058 } 1059 1060 public MessageGroupMapFactory getMessageGroupMapFactory() { 1061 return messageGroupMapFactory; 1062 } 1063 1064 public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) { 1065 this.messageGroupMapFactory = messageGroupMapFactory; 1066 } 1067 1068 public PendingMessageCursor getMessages() { 1069 return this.messages; 1070 } 1071 1072 public void setMessages(PendingMessageCursor messages) { 1073 this.messages = messages; 1074 } 1075 1076 public boolean isUseConsumerPriority() { 1077 return useConsumerPriority; 1078 } 1079 1080 public void setUseConsumerPriority(boolean useConsumerPriority) { 1081 this.useConsumerPriority = useConsumerPriority; 1082 } 1083 1084 public boolean isStrictOrderDispatch() { 1085 return strictOrderDispatch; 1086 } 1087 1088 public void setStrictOrderDispatch(boolean strictOrderDispatch) { 1089 this.strictOrderDispatch = strictOrderDispatch; 1090 } 1091 1092 public boolean isOptimizedDispatch() { 1093 return optimizedDispatch; 1094 } 1095 1096 public void setOptimizedDispatch(boolean optimizedDispatch) { 1097 this.optimizedDispatch = optimizedDispatch; 1098 } 1099 1100 public int getTimeBeforeDispatchStarts() { 1101 return timeBeforeDispatchStarts; 1102 } 1103 1104 public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) { 1105 this.timeBeforeDispatchStarts = timeBeforeDispatchStarts; 1106 } 1107 1108 public int getConsumersBeforeDispatchStarts() { 1109 return consumersBeforeDispatchStarts; 1110 } 1111 1112 public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) { 1113 this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts; 1114 } 1115 1116 public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) { 1117 this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault; 1118 } 1119 1120 public boolean isAllConsumersExclusiveByDefault() { 1121 return allConsumersExclusiveByDefault; 1122 } 1123 1124 public boolean isResetNeeded() { 1125 return resetNeeded; 1126 } 1127 1128 // Implementation methods 1129 // ------------------------------------------------------------------------- 1130 private QueueMessageReference createMessageReference(Message message) { 1131 QueueMessageReference result = new IndirectMessageReference(message); 1132 return result; 1133 } 1134 1135 @Override 1136 public Message[] browse() { 1137 List<Message> browseList = new ArrayList<Message>(); 1138 doBrowse(browseList, getMaxBrowsePageSize()); 1139 return browseList.toArray(new Message[browseList.size()]); 1140 } 1141 1142 public void doBrowse(List<Message> browseList, int max) { 1143 final ConnectionContext connectionContext = createConnectionContext(); 1144 try { 1145 int maxPageInAttempts = 1; 1146 if (max > 0) { 1147 messagesLock.readLock().lock(); 1148 try { 1149 maxPageInAttempts += (messages.size() / max); 1150 } finally { 1151 messagesLock.readLock().unlock(); 1152 } 1153 while (shouldPageInMoreForBrowse(max) && maxPageInAttempts-- > 0) { 1154 pageInMessages(!memoryUsage.isFull(110), max); 1155 } 1156 } 1157 doBrowseList(browseList, max, dispatchPendingList, pagedInPendingDispatchLock, connectionContext, "redeliveredWaitingDispatch+pagedInPendingDispatch"); 1158 doBrowseList(browseList, max, pagedInMessages, pagedInMessagesLock, connectionContext, "pagedInMessages"); 1159 1160 // we need a store iterator to walk messages on disk, independent of the cursor which is tracking 1161 // the next message batch 1162 } catch (Exception e) { 1163 LOG.error("Problem retrieving message for browse", e); 1164 } 1165 } 1166 1167 protected void doBrowseList(List<Message> browseList, int max, PendingList list, ReentrantReadWriteLock lock, ConnectionContext connectionContext, String name) throws Exception { 1168 List<MessageReference> toExpire = new ArrayList<MessageReference>(); 1169 lock.readLock().lock(); 1170 try { 1171 addAll(list.values(), browseList, max, toExpire); 1172 } finally { 1173 lock.readLock().unlock(); 1174 } 1175 for (MessageReference ref : toExpire) { 1176 if (broker.isExpired(ref)) { 1177 LOG.debug("expiring from {}: {}", name, ref); 1178 messageExpired(connectionContext, ref); 1179 } else { 1180 lock.writeLock().lock(); 1181 try { 1182 list.remove(ref); 1183 } finally { 1184 lock.writeLock().unlock(); 1185 } 1186 ref.decrementReferenceCount(); 1187 } 1188 } 1189 } 1190 1191 private boolean shouldPageInMoreForBrowse(int max) { 1192 int alreadyPagedIn = 0; 1193 pagedInMessagesLock.readLock().lock(); 1194 try { 1195 alreadyPagedIn = pagedInMessages.size(); 1196 } finally { 1197 pagedInMessagesLock.readLock().unlock(); 1198 } 1199 int messagesInQueue = alreadyPagedIn; 1200 messagesLock.readLock().lock(); 1201 try { 1202 messagesInQueue += messages.size(); 1203 } finally { 1204 messagesLock.readLock().unlock(); 1205 } 1206 1207 LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, messagesInQueue, memoryUsage.getPercentUsage()}); 1208 return (alreadyPagedIn < max) 1209 && (alreadyPagedIn < messagesInQueue) 1210 && messages.hasSpace(); 1211 } 1212 1213 private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int max, 1214 List<MessageReference> toExpire) throws Exception { 1215 for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < max;) { 1216 QueueMessageReference ref = (QueueMessageReference) i.next(); 1217 if (ref.isExpired() && (ref.getLockOwner() == null)) { 1218 toExpire.add(ref); 1219 } else if (!ref.isAcked() && l.contains(ref.getMessage()) == false) { 1220 l.add(ref.getMessage()); 1221 } 1222 } 1223 } 1224 1225 public QueueMessageReference getMessage(String id) { 1226 MessageId msgId = new MessageId(id); 1227 pagedInMessagesLock.readLock().lock(); 1228 try { 1229 QueueMessageReference ref = (QueueMessageReference)this.pagedInMessages.get(msgId); 1230 if (ref != null) { 1231 return ref; 1232 } 1233 } finally { 1234 pagedInMessagesLock.readLock().unlock(); 1235 } 1236 messagesLock.writeLock().lock(); 1237 try{ 1238 try { 1239 messages.reset(); 1240 while (messages.hasNext()) { 1241 MessageReference mr = messages.next(); 1242 QueueMessageReference qmr = createMessageReference(mr.getMessage()); 1243 qmr.decrementReferenceCount(); 1244 messages.rollback(qmr.getMessageId()); 1245 if (msgId.equals(qmr.getMessageId())) { 1246 return qmr; 1247 } 1248 } 1249 } finally { 1250 messages.release(); 1251 } 1252 }finally { 1253 messagesLock.writeLock().unlock(); 1254 } 1255 return null; 1256 } 1257 1258 public void purge() throws Exception { 1259 ConnectionContext c = createConnectionContext(); 1260 List<MessageReference> list = null; 1261 try { 1262 sendLock.lock(); 1263 long originalMessageCount = this.destinationStatistics.getMessages().getCount(); 1264 do { 1265 doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed. 1266 pagedInMessagesLock.readLock().lock(); 1267 try { 1268 list = new ArrayList<MessageReference>(pagedInMessages.values()); 1269 }finally { 1270 pagedInMessagesLock.readLock().unlock(); 1271 } 1272 1273 for (MessageReference ref : list) { 1274 try { 1275 QueueMessageReference r = (QueueMessageReference) ref; 1276 removeMessage(c, r); 1277 } catch (IOException e) { 1278 } 1279 } 1280 // don't spin/hang if stats are out and there is nothing left in the 1281 // store 1282 } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0); 1283 1284 if (getMessages().getMessageAudit() != null) { 1285 getMessages().getMessageAudit().clear(); 1286 } 1287 1288 if (this.destinationStatistics.getMessages().getCount() > 0) { 1289 LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount()); 1290 } 1291 } finally { 1292 sendLock.unlock(); 1293 } 1294 } 1295 1296 @Override 1297 public void clearPendingMessages(int pendingAdditionsCount) { 1298 messagesLock.writeLock().lock(); 1299 try { 1300 final ActiveMQMessage dummyPersistent = new ActiveMQMessage(); 1301 dummyPersistent.setPersistent(true); 1302 for (int i=0; i<pendingAdditionsCount; i++) { 1303 try { 1304 // track the increase in the cursor size w/o reverting to the store 1305 messages.addMessageFirst(dummyPersistent); 1306 } catch (Exception ignored) { 1307 LOG.debug("Unexpected exception on tracking pending message additions", ignored); 1308 } 1309 } 1310 if (resetNeeded) { 1311 messages.gc(); 1312 messages.reset(); 1313 resetNeeded = false; 1314 } else { 1315 messages.rebase(); 1316 } 1317 asyncWakeup(); 1318 } finally { 1319 messagesLock.writeLock().unlock(); 1320 } 1321 } 1322 1323 /** 1324 * Removes the message matching the given messageId 1325 */ 1326 public boolean removeMessage(String messageId) throws Exception { 1327 return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0; 1328 } 1329 1330 /** 1331 * Removes the messages matching the given selector 1332 * 1333 * @return the number of messages removed 1334 */ 1335 public int removeMatchingMessages(String selector) throws Exception { 1336 return removeMatchingMessages(selector, -1); 1337 } 1338 1339 /** 1340 * Removes the messages matching the given selector up to the maximum number 1341 * of matched messages 1342 * 1343 * @return the number of messages removed 1344 */ 1345 public int removeMatchingMessages(String selector, int maximumMessages) throws Exception { 1346 return removeMatchingMessages(createSelectorFilter(selector), maximumMessages); 1347 } 1348 1349 /** 1350 * Removes the messages matching the given filter up to the maximum number 1351 * of matched messages 1352 * 1353 * @return the number of messages removed 1354 */ 1355 public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception { 1356 int movedCounter = 0; 1357 Set<MessageReference> set = new LinkedHashSet<MessageReference>(); 1358 ConnectionContext context = createConnectionContext(); 1359 do { 1360 doPageIn(true); 1361 pagedInMessagesLock.readLock().lock(); 1362 try { 1363 set.addAll(pagedInMessages.values()); 1364 } finally { 1365 pagedInMessagesLock.readLock().unlock(); 1366 } 1367 List<MessageReference> list = new ArrayList<MessageReference>(set); 1368 for (MessageReference ref : list) { 1369 IndirectMessageReference r = (IndirectMessageReference) ref; 1370 if (filter.evaluate(context, r)) { 1371 1372 removeMessage(context, r); 1373 set.remove(r); 1374 if (++movedCounter >= maximumMessages && maximumMessages > 0) { 1375 return movedCounter; 1376 } 1377 } 1378 } 1379 } while (set.size() < this.destinationStatistics.getMessages().getCount()); 1380 return movedCounter; 1381 } 1382 1383 /** 1384 * Copies the message matching the given messageId 1385 */ 1386 public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) 1387 throws Exception { 1388 return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0; 1389 } 1390 1391 /** 1392 * Copies the messages matching the given selector 1393 * 1394 * @return the number of messages copied 1395 */ 1396 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) 1397 throws Exception { 1398 return copyMatchingMessagesTo(context, selector, dest, -1); 1399 } 1400 1401 /** 1402 * Copies the messages matching the given selector up to the maximum number 1403 * of matched messages 1404 * 1405 * @return the number of messages copied 1406 */ 1407 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, 1408 int maximumMessages) throws Exception { 1409 return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages); 1410 } 1411 1412 /** 1413 * Copies the messages matching the given filter up to the maximum number of 1414 * matched messages 1415 * 1416 * @return the number of messages copied 1417 */ 1418 public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, 1419 int maximumMessages) throws Exception { 1420 1421 if (destination.equals(dest)) { 1422 return 0; 1423 } 1424 1425 int movedCounter = 0; 1426 int count = 0; 1427 Set<MessageReference> set = new LinkedHashSet<MessageReference>(); 1428 do { 1429 int oldMaxSize = getMaxPageSize(); 1430 setMaxPageSize((int) this.destinationStatistics.getMessages().getCount()); 1431 doPageIn(true); 1432 setMaxPageSize(oldMaxSize); 1433 pagedInMessagesLock.readLock().lock(); 1434 try { 1435 set.addAll(pagedInMessages.values()); 1436 } finally { 1437 pagedInMessagesLock.readLock().unlock(); 1438 } 1439 List<MessageReference> list = new ArrayList<MessageReference>(set); 1440 for (MessageReference ref : list) { 1441 IndirectMessageReference r = (IndirectMessageReference) ref; 1442 if (filter.evaluate(context, r)) { 1443 1444 r.incrementReferenceCount(); 1445 try { 1446 Message m = r.getMessage(); 1447 BrokerSupport.resend(context, m, dest); 1448 if (++movedCounter >= maximumMessages && maximumMessages > 0) { 1449 return movedCounter; 1450 } 1451 } finally { 1452 r.decrementReferenceCount(); 1453 } 1454 } 1455 count++; 1456 } 1457 } while (count < this.destinationStatistics.getMessages().getCount()); 1458 return movedCounter; 1459 } 1460 1461 /** 1462 * Move a message 1463 * 1464 * @param context 1465 * connection context 1466 * @param m 1467 * QueueMessageReference 1468 * @param dest 1469 * ActiveMQDestination 1470 * @throws Exception 1471 */ 1472 public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception { 1473 BrokerSupport.resend(context, m.getMessage(), dest); 1474 removeMessage(context, m); 1475 messagesLock.writeLock().lock(); 1476 try { 1477 messages.rollback(m.getMessageId()); 1478 if (isDLQ()) { 1479 DeadLetterStrategy stratagy = getDeadLetterStrategy(); 1480 stratagy.rollback(m.getMessage()); 1481 } 1482 } finally { 1483 messagesLock.writeLock().unlock(); 1484 } 1485 return true; 1486 } 1487 1488 /** 1489 * Moves the message matching the given messageId 1490 */ 1491 public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) 1492 throws Exception { 1493 return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0; 1494 } 1495 1496 /** 1497 * Moves the messages matching the given selector 1498 * 1499 * @return the number of messages removed 1500 */ 1501 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) 1502 throws Exception { 1503 return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE); 1504 } 1505 1506 /** 1507 * Moves the messages matching the given selector up to the maximum number 1508 * of matched messages 1509 */ 1510 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, 1511 int maximumMessages) throws Exception { 1512 return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages); 1513 } 1514 1515 /** 1516 * Moves the messages matching the given filter up to the maximum number of 1517 * matched messages 1518 */ 1519 public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, 1520 ActiveMQDestination dest, int maximumMessages) throws Exception { 1521 1522 if (destination.equals(dest)) { 1523 return 0; 1524 } 1525 1526 int movedCounter = 0; 1527 Set<MessageReference> set = new LinkedHashSet<MessageReference>(); 1528 do { 1529 doPageIn(true); 1530 pagedInMessagesLock.readLock().lock(); 1531 try { 1532 set.addAll(pagedInMessages.values()); 1533 } finally { 1534 pagedInMessagesLock.readLock().unlock(); 1535 } 1536 List<MessageReference> list = new ArrayList<MessageReference>(set); 1537 for (MessageReference ref : list) { 1538 if (filter.evaluate(context, ref)) { 1539 // We should only move messages that can be locked. 1540 moveMessageTo(context, (QueueMessageReference)ref, dest); 1541 set.remove(ref); 1542 if (++movedCounter >= maximumMessages && maximumMessages > 0) { 1543 return movedCounter; 1544 } 1545 } 1546 } 1547 } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages); 1548 return movedCounter; 1549 } 1550 1551 public int retryMessages(ConnectionContext context, int maximumMessages) throws Exception { 1552 if (!isDLQ()) { 1553 throw new Exception("Retry of message is only possible on Dead Letter Queues!"); 1554 } 1555 int restoredCounter = 0; 1556 Set<MessageReference> set = new LinkedHashSet<MessageReference>(); 1557 do { 1558 doPageIn(true); 1559 pagedInMessagesLock.readLock().lock(); 1560 try { 1561 set.addAll(pagedInMessages.values()); 1562 } finally { 1563 pagedInMessagesLock.readLock().unlock(); 1564 } 1565 List<MessageReference> list = new ArrayList<MessageReference>(set); 1566 for (MessageReference ref : list) { 1567 if (ref.getMessage().getOriginalDestination() != null) { 1568 1569 moveMessageTo(context, (QueueMessageReference)ref, ref.getMessage().getOriginalDestination()); 1570 set.remove(ref); 1571 if (++restoredCounter >= maximumMessages && maximumMessages > 0) { 1572 return restoredCounter; 1573 } 1574 } 1575 } 1576 } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages); 1577 return restoredCounter; 1578 } 1579 1580 /** 1581 * @return true if we would like to iterate again 1582 * @see org.apache.activemq.thread.Task#iterate() 1583 */ 1584 @Override 1585 public boolean iterate() { 1586 MDC.put("activemq.destination", getName()); 1587 boolean pageInMoreMessages = false; 1588 synchronized (iteratingMutex) { 1589 1590 // If optimize dispatch is on or this is a slave this method could be called recursively 1591 // we set this state value to short-circuit wakeup in those cases to avoid that as it 1592 // could lead to errors. 1593 iterationRunning = true; 1594 1595 // do early to allow dispatch of these waiting messages 1596 synchronized (messagesWaitingForSpace) { 1597 Iterator<Runnable> it = messagesWaitingForSpace.values().iterator(); 1598 while (it.hasNext()) { 1599 if (!memoryUsage.isFull()) { 1600 Runnable op = it.next(); 1601 it.remove(); 1602 op.run(); 1603 } else { 1604 registerCallbackForNotFullNotification(); 1605 break; 1606 } 1607 } 1608 } 1609 1610 if (firstConsumer) { 1611 firstConsumer = false; 1612 try { 1613 if (consumersBeforeDispatchStarts > 0) { 1614 int timeout = 1000; // wait one second by default if 1615 // consumer count isn't reached 1616 if (timeBeforeDispatchStarts > 0) { 1617 timeout = timeBeforeDispatchStarts; 1618 } 1619 if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) { 1620 LOG.debug("{} consumers subscribed. Starting dispatch.", consumers.size()); 1621 } else { 1622 LOG.debug("{} ms elapsed and {} consumers subscribed. Starting dispatch.", timeout, consumers.size()); 1623 } 1624 } 1625 if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) { 1626 iteratingMutex.wait(timeBeforeDispatchStarts); 1627 LOG.debug("{} ms elapsed. Starting dispatch.", timeBeforeDispatchStarts); 1628 } 1629 } catch (Exception e) { 1630 LOG.error(e.toString()); 1631 } 1632 } 1633 1634 messagesLock.readLock().lock(); 1635 try{ 1636 pageInMoreMessages |= !messages.isEmpty(); 1637 } finally { 1638 messagesLock.readLock().unlock(); 1639 } 1640 1641 pagedInPendingDispatchLock.readLock().lock(); 1642 try { 1643 pageInMoreMessages |= !dispatchPendingList.isEmpty(); 1644 } finally { 1645 pagedInPendingDispatchLock.readLock().unlock(); 1646 } 1647 1648 boolean hasBrowsers = !browserDispatches.isEmpty(); 1649 1650 if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) { 1651 try { 1652 pageInMessages(hasBrowsers && getMaxBrowsePageSize() > 0, getMaxPageSize()); 1653 } catch (Throwable e) { 1654 LOG.error("Failed to page in more queue messages ", e); 1655 } 1656 } 1657 1658 if (hasBrowsers) { 1659 PendingList messagesInMemory = isPrioritizedMessages() ? 1660 new PrioritizedPendingList() : new OrderedPendingList(); 1661 pagedInMessagesLock.readLock().lock(); 1662 try { 1663 messagesInMemory.addAll(pagedInMessages); 1664 } finally { 1665 pagedInMessagesLock.readLock().unlock(); 1666 } 1667 1668 Iterator<BrowserDispatch> browsers = browserDispatches.iterator(); 1669 while (browsers.hasNext()) { 1670 BrowserDispatch browserDispatch = browsers.next(); 1671 try { 1672 MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 1673 msgContext.setDestination(destination); 1674 1675 QueueBrowserSubscription browser = browserDispatch.getBrowser(); 1676 1677 LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, messagesInMemory.size()); 1678 boolean added = false; 1679 for (MessageReference node : messagesInMemory) { 1680 if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) { 1681 msgContext.setMessageReference(node); 1682 if (browser.matches(node, msgContext)) { 1683 browser.add(node); 1684 added = true; 1685 } 1686 } 1687 } 1688 // are we done browsing? no new messages paged 1689 if (!added || browser.atMax()) { 1690 browser.decrementQueueRef(); 1691 browserDispatches.remove(browserDispatch); 1692 } else { 1693 wakeup(); 1694 } 1695 } catch (Exception e) { 1696 LOG.warn("exception on dispatch to browser: {}", browserDispatch.getBrowser(), e); 1697 } 1698 } 1699 } 1700 1701 if (pendingWakeups.get() > 0) { 1702 pendingWakeups.decrementAndGet(); 1703 } 1704 MDC.remove("activemq.destination"); 1705 iterationRunning = false; 1706 1707 return pendingWakeups.get() > 0; 1708 } 1709 } 1710 1711 public void pauseDispatch() { 1712 dispatchSelector.pause(); 1713 } 1714 1715 public void resumeDispatch() { 1716 dispatchSelector.resume(); 1717 wakeup(); 1718 } 1719 1720 public boolean isDispatchPaused() { 1721 return dispatchSelector.isPaused(); 1722 } 1723 1724 protected MessageReferenceFilter createMessageIdFilter(final String messageId) { 1725 return new MessageReferenceFilter() { 1726 @Override 1727 public boolean evaluate(ConnectionContext context, MessageReference r) { 1728 return messageId.equals(r.getMessageId().toString()); 1729 } 1730 1731 @Override 1732 public String toString() { 1733 return "MessageIdFilter: " + messageId; 1734 } 1735 }; 1736 } 1737 1738 protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException { 1739 1740 if (selector == null || selector.isEmpty()) { 1741 return new MessageReferenceFilter() { 1742 1743 @Override 1744 public boolean evaluate(ConnectionContext context, MessageReference messageReference) throws JMSException { 1745 return true; 1746 } 1747 }; 1748 } 1749 1750 final BooleanExpression selectorExpression = SelectorParser.parse(selector); 1751 1752 return new MessageReferenceFilter() { 1753 @Override 1754 public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException { 1755 MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext(); 1756 1757 messageEvaluationContext.setMessageReference(r); 1758 if (messageEvaluationContext.getDestination() == null) { 1759 messageEvaluationContext.setDestination(getActiveMQDestination()); 1760 } 1761 1762 return selectorExpression.matches(messageEvaluationContext); 1763 } 1764 }; 1765 } 1766 1767 protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException { 1768 removeMessage(c, null, r); 1769 pagedInPendingDispatchLock.writeLock().lock(); 1770 try { 1771 dispatchPendingList.remove(r); 1772 } finally { 1773 pagedInPendingDispatchLock.writeLock().unlock(); 1774 } 1775 } 1776 1777 protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException { 1778 MessageAck ack = new MessageAck(); 1779 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 1780 ack.setDestination(destination); 1781 ack.setMessageID(r.getMessageId()); 1782 removeMessage(c, subs, r, ack); 1783 } 1784 1785 protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference, 1786 MessageAck ack) throws IOException { 1787 LOG.trace("ack of {} with {}", reference.getMessageId(), ack); 1788 // This sends the ack the the journal.. 1789 if (!ack.isInTransaction()) { 1790 acknowledge(context, sub, ack, reference); 1791 getDestinationStatistics().getDequeues().increment(); 1792 dropMessage(reference); 1793 } else { 1794 try { 1795 acknowledge(context, sub, ack, reference); 1796 } finally { 1797 context.getTransaction().addSynchronization(new Synchronization() { 1798 1799 @Override 1800 public void afterCommit() throws Exception { 1801 getDestinationStatistics().getDequeues().increment(); 1802 dropMessage(reference); 1803 wakeup(); 1804 } 1805 1806 @Override 1807 public void afterRollback() throws Exception { 1808 reference.setAcked(false); 1809 wakeup(); 1810 } 1811 }); 1812 } 1813 } 1814 if (ack.isPoisonAck() || (sub != null && sub.getConsumerInfo().isNetworkSubscription())) { 1815 // message gone to DLQ, is ok to allow redelivery 1816 messagesLock.writeLock().lock(); 1817 try { 1818 messages.rollback(reference.getMessageId()); 1819 } finally { 1820 messagesLock.writeLock().unlock(); 1821 } 1822 if (sub != null && sub.getConsumerInfo().isNetworkSubscription()) { 1823 getDestinationStatistics().getForwards().increment(); 1824 } 1825 } 1826 // after successful store update 1827 reference.setAcked(true); 1828 } 1829 1830 private void dropMessage(QueueMessageReference reference) { 1831 if (!reference.isDropped()) { 1832 reference.drop(); 1833 destinationStatistics.getMessages().decrement(); 1834 pagedInMessagesLock.writeLock().lock(); 1835 try { 1836 pagedInMessages.remove(reference); 1837 } finally { 1838 pagedInMessagesLock.writeLock().unlock(); 1839 } 1840 } 1841 } 1842 1843 public void messageExpired(ConnectionContext context, MessageReference reference) { 1844 messageExpired(context, null, reference); 1845 } 1846 1847 @Override 1848 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 1849 LOG.debug("message expired: {}", reference); 1850 broker.messageExpired(context, reference, subs); 1851 destinationStatistics.getExpired().increment(); 1852 try { 1853 removeMessage(context, subs, (QueueMessageReference) reference); 1854 messagesLock.writeLock().lock(); 1855 try { 1856 messages.rollback(reference.getMessageId()); 1857 } finally { 1858 messagesLock.writeLock().unlock(); 1859 } 1860 } catch (IOException e) { 1861 LOG.error("Failed to remove expired Message from the store ", e); 1862 } 1863 } 1864 1865 final boolean cursorAdd(final Message msg) throws Exception { 1866 messagesLock.writeLock().lock(); 1867 try { 1868 return messages.addMessageLast(msg); 1869 } finally { 1870 messagesLock.writeLock().unlock(); 1871 } 1872 } 1873 1874 final void messageSent(final ConnectionContext context, final Message msg) throws Exception { 1875 pendingSends.decrementAndGet(); 1876 destinationStatistics.getEnqueues().increment(); 1877 destinationStatistics.getMessages().increment(); 1878 destinationStatistics.getMessageSize().addSize(msg.getSize()); 1879 messageDelivered(context, msg); 1880 consumersLock.readLock().lock(); 1881 try { 1882 if (consumers.isEmpty()) { 1883 onMessageWithNoConsumers(context, msg); 1884 } 1885 }finally { 1886 consumersLock.readLock().unlock(); 1887 } 1888 LOG.debug("{} Message {} sent to {}", new Object[]{ broker.getBrokerName(), msg.getMessageId(), this.destination }); 1889 wakeup(); 1890 } 1891 1892 @Override 1893 public void wakeup() { 1894 if (optimizedDispatch && !iterationRunning) { 1895 iterate(); 1896 pendingWakeups.incrementAndGet(); 1897 } else { 1898 asyncWakeup(); 1899 } 1900 } 1901 1902 private void asyncWakeup() { 1903 try { 1904 pendingWakeups.incrementAndGet(); 1905 this.taskRunner.wakeup(); 1906 } catch (InterruptedException e) { 1907 LOG.warn("Async task runner failed to wakeup ", e); 1908 } 1909 } 1910 1911 private void doPageIn(boolean force) throws Exception { 1912 doPageIn(force, true, getMaxPageSize()); 1913 } 1914 1915 private void doPageIn(boolean force, boolean processExpired, int maxPageSize) throws Exception { 1916 PendingList newlyPaged = doPageInForDispatch(force, processExpired, maxPageSize); 1917 pagedInPendingDispatchLock.writeLock().lock(); 1918 try { 1919 if (dispatchPendingList.isEmpty()) { 1920 dispatchPendingList.addAll(newlyPaged); 1921 1922 } else { 1923 for (MessageReference qmr : newlyPaged) { 1924 if (!dispatchPendingList.contains(qmr)) { 1925 dispatchPendingList.addMessageLast(qmr); 1926 } 1927 } 1928 } 1929 } finally { 1930 pagedInPendingDispatchLock.writeLock().unlock(); 1931 } 1932 } 1933 1934 private PendingList doPageInForDispatch(boolean force, boolean processExpired, int maxPageSize) throws Exception { 1935 List<QueueMessageReference> result = null; 1936 PendingList resultList = null; 1937 1938 int toPageIn = maxPageSize; 1939 messagesLock.readLock().lock(); 1940 try { 1941 toPageIn = Math.min(toPageIn, messages.size()); 1942 } finally { 1943 messagesLock.readLock().unlock(); 1944 } 1945 int pagedInPendingSize = 0; 1946 pagedInPendingDispatchLock.readLock().lock(); 1947 try { 1948 pagedInPendingSize = dispatchPendingList.size(); 1949 } finally { 1950 pagedInPendingDispatchLock.readLock().unlock(); 1951 } 1952 if (isLazyDispatch() && !force) { 1953 // Only page in the minimum number of messages which can be 1954 // dispatched immediately. 1955 toPageIn = Math.min(toPageIn, getConsumerMessageCountBeforeFull()); 1956 } 1957 1958 if (LOG.isDebugEnabled()) { 1959 LOG.debug("{} toPageIn: {}, force:{}, Inflight: {}, pagedInMessages.size {}, pagedInPendingDispatch.size {}, enqueueCount: {}, dequeueCount: {}, memUsage:{}, maxPageSize:{}", 1960 new Object[]{ 1961 this, 1962 toPageIn, 1963 force, 1964 destinationStatistics.getInflight().getCount(), 1965 pagedInMessages.size(), 1966 pagedInPendingSize, 1967 destinationStatistics.getEnqueues().getCount(), 1968 destinationStatistics.getDequeues().getCount(), 1969 getMemoryUsage().getUsage(), 1970 maxPageSize 1971 }); 1972 } 1973 1974 if (toPageIn > 0 && (force || (haveRealConsumer() && pagedInPendingSize < maxPageSize))) { 1975 int count = 0; 1976 result = new ArrayList<QueueMessageReference>(toPageIn); 1977 messagesLock.writeLock().lock(); 1978 try { 1979 try { 1980 messages.setMaxBatchSize(toPageIn); 1981 messages.reset(); 1982 while (count < toPageIn && messages.hasNext()) { 1983 MessageReference node = messages.next(); 1984 messages.remove(); 1985 1986 QueueMessageReference ref = createMessageReference(node.getMessage()); 1987 if (processExpired && ref.isExpired()) { 1988 if (broker.isExpired(ref)) { 1989 messageExpired(createConnectionContext(), ref); 1990 } else { 1991 ref.decrementReferenceCount(); 1992 } 1993 } else { 1994 result.add(ref); 1995 count++; 1996 } 1997 } 1998 } finally { 1999 messages.release(); 2000 } 2001 } finally { 2002 messagesLock.writeLock().unlock(); 2003 } 2004 2005 if (count > 0) { 2006 // Only add new messages, not already pagedIn to avoid multiple 2007 // dispatch attempts 2008 pagedInMessagesLock.writeLock().lock(); 2009 try { 2010 if (isPrioritizedMessages()) { 2011 resultList = new PrioritizedPendingList(); 2012 } else { 2013 resultList = new OrderedPendingList(); 2014 } 2015 for (QueueMessageReference ref : result) { 2016 if (!pagedInMessages.contains(ref)) { 2017 pagedInMessages.addMessageLast(ref); 2018 resultList.addMessageLast(ref); 2019 } else { 2020 ref.decrementReferenceCount(); 2021 // store should have trapped duplicate in it's index, or cursor audit trapped insert 2022 // or producerBrokerExchange suppressed send. 2023 // note: jdbc store will not trap unacked messages as a duplicate b/c it gives each message a unique sequence id 2024 LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong()); 2025 if (store != null) { 2026 ConnectionContext connectionContext = createConnectionContext(); 2027 dropMessage(ref); 2028 if (gotToTheStore(ref.getMessage())) { 2029 LOG.debug("Duplicate message {} from cursor, removing from store", this, ref.getMessage()); 2030 store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1)); 2031 } 2032 broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from cursor for " + destination)); 2033 } 2034 } 2035 } 2036 } finally { 2037 pagedInMessagesLock.writeLock().unlock(); 2038 } 2039 } else if (!messages.hasSpace()) { 2040 if (isFlowControlLogRequired()) { 2041 LOG.warn("{} cursor blocked, no space available to page in messages; usage: {}", this, this.systemUsage.getMemoryUsage()); 2042 } else { 2043 LOG.debug("{} cursor blocked, no space available to page in messages; usage: {}", this, this.systemUsage.getMemoryUsage()); 2044 } 2045 } 2046 } else { 2047 // Avoid return null list, if condition is not validated 2048 resultList = new OrderedPendingList(); 2049 } 2050 2051 return resultList; 2052 } 2053 2054 private final boolean haveRealConsumer() { 2055 return consumers.size() - browserDispatches.size() > 0; 2056 } 2057 2058 private void doDispatch(PendingList list) throws Exception { 2059 boolean doWakeUp = false; 2060 2061 pagedInPendingDispatchLock.writeLock().lock(); 2062 try { 2063 if (isPrioritizedMessages() && !dispatchPendingList.isEmpty() && list != null && !list.isEmpty()) { 2064 // merge all to select priority order 2065 for (MessageReference qmr : list) { 2066 if (!dispatchPendingList.contains(qmr)) { 2067 dispatchPendingList.addMessageLast(qmr); 2068 } 2069 } 2070 list = null; 2071 } 2072 2073 doActualDispatch(dispatchPendingList); 2074 // and now see if we can dispatch the new stuff.. and append to the pending 2075 // list anything that does not actually get dispatched. 2076 if (list != null && !list.isEmpty()) { 2077 if (dispatchPendingList.isEmpty()) { 2078 dispatchPendingList.addAll(doActualDispatch(list)); 2079 } else { 2080 for (MessageReference qmr : list) { 2081 if (!dispatchPendingList.contains(qmr)) { 2082 dispatchPendingList.addMessageLast(qmr); 2083 } 2084 } 2085 doWakeUp = true; 2086 } 2087 } 2088 } finally { 2089 pagedInPendingDispatchLock.writeLock().unlock(); 2090 } 2091 2092 if (doWakeUp) { 2093 // avoid lock order contention 2094 asyncWakeup(); 2095 } 2096 } 2097 2098 /** 2099 * @return list of messages that could get dispatched to consumers if they 2100 * were not full. 2101 */ 2102 private PendingList doActualDispatch(PendingList list) throws Exception { 2103 List<Subscription> consumers; 2104 consumersLock.readLock().lock(); 2105 2106 try { 2107 if (this.consumers.isEmpty()) { 2108 // slave dispatch happens in processDispatchNotification 2109 return list; 2110 } 2111 consumers = new ArrayList<Subscription>(this.consumers); 2112 } finally { 2113 consumersLock.readLock().unlock(); 2114 } 2115 2116 Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size()); 2117 2118 for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) { 2119 2120 MessageReference node = iterator.next(); 2121 Subscription target = null; 2122 for (Subscription s : consumers) { 2123 if (s instanceof QueueBrowserSubscription) { 2124 continue; 2125 } 2126 if (!fullConsumers.contains(s)) { 2127 if (!s.isFull()) { 2128 if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) { 2129 // Dispatch it. 2130 s.add(node); 2131 LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId()); 2132 iterator.remove(); 2133 target = s; 2134 break; 2135 } 2136 } else { 2137 // no further dispatch of list to a full consumer to 2138 // avoid out of order message receipt 2139 fullConsumers.add(s); 2140 LOG.trace("Subscription full {}", s); 2141 } 2142 } 2143 } 2144 2145 if (target == null && node.isDropped()) { 2146 iterator.remove(); 2147 } 2148 2149 // return if there are no consumers or all consumers are full 2150 if (target == null && consumers.size() == fullConsumers.size()) { 2151 return list; 2152 } 2153 2154 // If it got dispatched, rotate the consumer list to get round robin 2155 // distribution. 2156 if (target != null && !strictOrderDispatch && consumers.size() > 1 2157 && !dispatchSelector.isExclusiveConsumer(target)) { 2158 consumersLock.writeLock().lock(); 2159 try { 2160 if (removeFromConsumerList(target)) { 2161 addToConsumerList(target); 2162 consumers = new ArrayList<Subscription>(this.consumers); 2163 } 2164 } finally { 2165 consumersLock.writeLock().unlock(); 2166 } 2167 } 2168 } 2169 2170 return list; 2171 } 2172 2173 protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception { 2174 boolean result = true; 2175 // Keep message groups together. 2176 String groupId = node.getGroupID(); 2177 int sequence = node.getGroupSequence(); 2178 if (groupId != null) { 2179 2180 MessageGroupMap messageGroupOwners = getMessageGroupOwners(); 2181 // If we can own the first, then no-one else should own the 2182 // rest. 2183 if (sequence == 1) { 2184 assignGroup(subscription, messageGroupOwners, node, groupId); 2185 } else { 2186 2187 // Make sure that the previous owner is still valid, we may 2188 // need to become the new owner. 2189 ConsumerId groupOwner; 2190 2191 groupOwner = messageGroupOwners.get(groupId); 2192 if (groupOwner == null) { 2193 assignGroup(subscription, messageGroupOwners, node, groupId); 2194 } else { 2195 if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) { 2196 // A group sequence < 1 is an end of group signal. 2197 if (sequence < 0) { 2198 messageGroupOwners.removeGroup(groupId); 2199 subscription.getConsumerInfo().decrementAssignedGroupCount(destination); 2200 } 2201 } else { 2202 result = false; 2203 } 2204 } 2205 } 2206 } 2207 2208 return result; 2209 } 2210 2211 protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException { 2212 messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId()); 2213 Message message = n.getMessage(); 2214 message.setJMSXGroupFirstForConsumer(true); 2215 subs.getConsumerInfo().incrementAssignedGroupCount(destination); 2216 } 2217 2218 protected void pageInMessages(boolean force, int maxPageSize) throws Exception { 2219 doDispatch(doPageInForDispatch(force, true, maxPageSize)); 2220 } 2221 2222 private void addToConsumerList(Subscription sub) { 2223 if (useConsumerPriority) { 2224 consumers.add(sub); 2225 Collections.sort(consumers, orderedCompare); 2226 } else { 2227 consumers.add(sub); 2228 } 2229 } 2230 2231 private boolean removeFromConsumerList(Subscription sub) { 2232 return consumers.remove(sub); 2233 } 2234 2235 private int getConsumerMessageCountBeforeFull() throws Exception { 2236 int total = 0; 2237 consumersLock.readLock().lock(); 2238 try { 2239 for (Subscription s : consumers) { 2240 if (s.isBrowser()) { 2241 continue; 2242 } 2243 int countBeforeFull = s.countBeforeFull(); 2244 total += countBeforeFull; 2245 } 2246 } finally { 2247 consumersLock.readLock().unlock(); 2248 } 2249 return total; 2250 } 2251 2252 /* 2253 * In slave mode, dispatch is ignored till we get this notification as the 2254 * dispatch process is non deterministic between master and slave. On a 2255 * notification, the actual dispatch to the subscription (as chosen by the 2256 * master) is completed. (non-Javadoc) 2257 * @see 2258 * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification 2259 * (org.apache.activemq.command.MessageDispatchNotification) 2260 */ 2261 @Override 2262 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 2263 // do dispatch 2264 Subscription sub = getMatchingSubscription(messageDispatchNotification); 2265 if (sub != null) { 2266 MessageReference message = getMatchingMessage(messageDispatchNotification); 2267 sub.add(message); 2268 sub.processMessageDispatchNotification(messageDispatchNotification); 2269 } 2270 } 2271 2272 private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification) 2273 throws Exception { 2274 QueueMessageReference message = null; 2275 MessageId messageId = messageDispatchNotification.getMessageId(); 2276 2277 pagedInPendingDispatchLock.writeLock().lock(); 2278 try { 2279 for (MessageReference ref : dispatchPendingList) { 2280 if (messageId.equals(ref.getMessageId())) { 2281 message = (QueueMessageReference)ref; 2282 dispatchPendingList.remove(ref); 2283 break; 2284 } 2285 } 2286 } finally { 2287 pagedInPendingDispatchLock.writeLock().unlock(); 2288 } 2289 2290 if (message == null) { 2291 pagedInMessagesLock.readLock().lock(); 2292 try { 2293 message = (QueueMessageReference)pagedInMessages.get(messageId); 2294 } finally { 2295 pagedInMessagesLock.readLock().unlock(); 2296 } 2297 } 2298 2299 if (message == null) { 2300 messagesLock.writeLock().lock(); 2301 try { 2302 try { 2303 messages.setMaxBatchSize(getMaxPageSize()); 2304 messages.reset(); 2305 while (messages.hasNext()) { 2306 MessageReference node = messages.next(); 2307 messages.remove(); 2308 if (messageId.equals(node.getMessageId())) { 2309 message = this.createMessageReference(node.getMessage()); 2310 break; 2311 } 2312 } 2313 } finally { 2314 messages.release(); 2315 } 2316 } finally { 2317 messagesLock.writeLock().unlock(); 2318 } 2319 } 2320 2321 if (message == null) { 2322 Message msg = loadMessage(messageId); 2323 if (msg != null) { 2324 message = this.createMessageReference(msg); 2325 } 2326 } 2327 2328 if (message == null) { 2329 throw new JMSException("Slave broker out of sync with master - Message: " 2330 + messageDispatchNotification.getMessageId() + " on " 2331 + messageDispatchNotification.getDestination() + " does not exist among pending(" 2332 + dispatchPendingList.size() + ") for subscription: " 2333 + messageDispatchNotification.getConsumerId()); 2334 } 2335 return message; 2336 } 2337 2338 /** 2339 * Find a consumer that matches the id in the message dispatch notification 2340 * 2341 * @param messageDispatchNotification 2342 * @return sub or null if the subscription has been removed before dispatch 2343 * @throws JMSException 2344 */ 2345 private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification) 2346 throws JMSException { 2347 Subscription sub = null; 2348 consumersLock.readLock().lock(); 2349 try { 2350 for (Subscription s : consumers) { 2351 if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) { 2352 sub = s; 2353 break; 2354 } 2355 } 2356 } finally { 2357 consumersLock.readLock().unlock(); 2358 } 2359 return sub; 2360 } 2361 2362 @Override 2363 public void onUsageChanged(@SuppressWarnings("rawtypes") Usage usage, int oldPercentUsage, int newPercentUsage) { 2364 if (oldPercentUsage > newPercentUsage) { 2365 asyncWakeup(); 2366 } 2367 } 2368 2369 @Override 2370 protected Logger getLog() { 2371 return LOG; 2372 } 2373 2374 protected boolean isOptimizeStorage(){ 2375 boolean result = false; 2376 if (isDoOptimzeMessageStorage()){ 2377 consumersLock.readLock().lock(); 2378 try{ 2379 if (consumers.isEmpty()==false){ 2380 result = true; 2381 for (Subscription s : consumers) { 2382 if (s.getPrefetchSize()==0){ 2383 result = false; 2384 break; 2385 } 2386 if (s.isSlowConsumer()){ 2387 result = false; 2388 break; 2389 } 2390 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){ 2391 result = false; 2392 break; 2393 } 2394 } 2395 } 2396 } finally { 2397 consumersLock.readLock().unlock(); 2398 } 2399 } 2400 return result; 2401 } 2402}