001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.LinkedList; 023import java.util.List; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.TimeUnit; 026 027import javax.jms.JMSException; 028 029import org.apache.activemq.broker.Broker; 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 032import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 033import org.apache.activemq.command.ConsumerControl; 034import org.apache.activemq.command.ConsumerInfo; 035import org.apache.activemq.command.Message; 036import org.apache.activemq.command.MessageAck; 037import org.apache.activemq.command.MessageDispatch; 038import org.apache.activemq.command.MessageDispatchNotification; 039import org.apache.activemq.command.MessageId; 040import org.apache.activemq.command.MessagePull; 041import org.apache.activemq.command.Response; 042import org.apache.activemq.thread.Scheduler; 043import org.apache.activemq.transaction.Synchronization; 044import org.apache.activemq.transport.TransmitCallback; 045import org.apache.activemq.usage.SystemUsage; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * A subscription that honors the pre-fetch option of the ConsumerInfo. 051 */ 052public abstract class PrefetchSubscription extends AbstractSubscription { 053 054 private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class); 055 protected final Scheduler scheduler; 056 057 protected PendingMessageCursor pending; 058 protected final List<MessageReference> dispatched = new ArrayList<MessageReference>(); 059 private int maxProducersToAudit=32; 060 private int maxAuditDepth=2048; 061 protected final SystemUsage usageManager; 062 protected final Object pendingLock = new Object(); 063 protected final Object dispatchLock = new Object(); 064 private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1); 065 066 public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws JMSException { 067 super(broker,context, info); 068 this.usageManager=usageManager; 069 pending = cursor; 070 try { 071 pending.start(); 072 } catch (Exception e) { 073 throw new JMSException(e.getMessage()); 074 } 075 this.scheduler = broker.getScheduler(); 076 } 077 078 public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException { 079 this(broker,usageManager,context, info, new VMPendingMessageCursor(false)); 080 } 081 082 /** 083 * Allows a message to be pulled on demand by a client 084 */ 085 @Override 086 public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception { 087 // The slave should not deliver pull messages. 088 // TODO: when the slave becomes a master, He should send a NULL message to all the 089 // consumers to 'wake them up' in case they were waiting for a message. 090 if (getPrefetchSize() == 0) { 091 prefetchExtension.set(pull.getQuantity()); 092 final long dispatchCounterBeforePull = getSubscriptionStatistics().getDispatched().getCount(); 093 094 // Have the destination push us some messages. 095 for (Destination dest : destinations) { 096 dest.iterate(); 097 } 098 dispatchPending(); 099 100 synchronized(this) { 101 // If there was nothing dispatched.. we may need to setup a timeout. 102 if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) { 103 // immediate timeout used by receiveNoWait() 104 if (pull.getTimeout() == -1) { 105 // Null message indicates the pull is done or did not have pending. 106 prefetchExtension.set(1); 107 add(QueueMessageReference.NULL_MESSAGE); 108 dispatchPending(); 109 } 110 if (pull.getTimeout() > 0) { 111 scheduler.executeAfterDelay(new Runnable() { 112 @Override 113 public void run() { 114 pullTimeout(dispatchCounterBeforePull, pull.isAlwaysSignalDone()); 115 } 116 }, pull.getTimeout()); 117 } 118 } 119 } 120 } 121 return null; 122 } 123 124 /** 125 * Occurs when a pull times out. If nothing has been dispatched since the 126 * timeout was setup, then send the NULL message. 127 */ 128 final void pullTimeout(long dispatchCounterBeforePull, boolean alwaysSignalDone) { 129 synchronized (pendingLock) { 130 if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || alwaysSignalDone) { 131 try { 132 prefetchExtension.set(1); 133 add(QueueMessageReference.NULL_MESSAGE); 134 dispatchPending(); 135 } catch (Exception e) { 136 context.getConnection().serviceException(e); 137 } finally { 138 prefetchExtension.set(0); 139 } 140 } 141 } 142 } 143 144 @Override 145 public void add(MessageReference node) throws Exception { 146 synchronized (pendingLock) { 147 // The destination may have just been removed... 148 if (!destinations.contains(node.getRegionDestination()) && node != QueueMessageReference.NULL_MESSAGE) { 149 // perhaps we should inform the caller that we are no longer valid to dispatch to? 150 return; 151 } 152 153 // Don't increment for the pullTimeout control message. 154 if (!node.equals(QueueMessageReference.NULL_MESSAGE)) { 155 getSubscriptionStatistics().getEnqueues().increment(); 156 } 157 pending.addMessageLast(node); 158 } 159 dispatchPending(); 160 } 161 162 @Override 163 public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { 164 synchronized(pendingLock) { 165 try { 166 pending.reset(); 167 while (pending.hasNext()) { 168 MessageReference node = pending.next(); 169 node.decrementReferenceCount(); 170 if (node.getMessageId().equals(mdn.getMessageId())) { 171 // Synchronize between dispatched list and removal of messages from pending list 172 // related to remove subscription action 173 synchronized(dispatchLock) { 174 pending.remove(); 175 createMessageDispatch(node, node.getMessage()); 176 dispatched.add(node); 177 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); 178 onDispatch(node, node.getMessage()); 179 } 180 return; 181 } 182 } 183 } finally { 184 pending.release(); 185 } 186 } 187 throw new JMSException( 188 "Slave broker out of sync with master: Dispatched message (" 189 + mdn.getMessageId() + ") was not in the pending list for " 190 + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName()); 191 } 192 193 @Override 194 public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception { 195 // Handle the standard acknowledgment case. 196 boolean callDispatchMatched = false; 197 Destination destination = null; 198 199 if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) { 200 // suppress unexpected ack exception in this expected case 201 LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: {}", ack); 202 return; 203 } 204 205 LOG.trace("ack: {}", ack); 206 207 synchronized(dispatchLock) { 208 if (ack.isStandardAck()) { 209 // First check if the ack matches the dispatched. When using failover this might 210 // not be the case. We don't ever want to ack the wrong messages. 211 assertAckMatchesDispatched(ack); 212 213 // Acknowledge all dispatched messages up till the message id of 214 // the acknowledgment. 215 boolean inAckRange = false; 216 List<MessageReference> removeList = new ArrayList<MessageReference>(); 217 for (final MessageReference node : dispatched) { 218 MessageId messageId = node.getMessageId(); 219 if (ack.getFirstMessageId() == null 220 || ack.getFirstMessageId().equals(messageId)) { 221 inAckRange = true; 222 } 223 if (inAckRange) { 224 // Don't remove the nodes until we are committed. 225 if (!context.isInTransaction()) { 226 getSubscriptionStatistics().getDequeues().increment(); 227 ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); 228 removeList.add(node); 229 contractPrefetchExtension(1); 230 } else { 231 registerRemoveSync(context, node); 232 } 233 acknowledge(context, ack, node); 234 if (ack.getLastMessageId().equals(messageId)) { 235 destination = (Destination) node.getRegionDestination(); 236 callDispatchMatched = true; 237 break; 238 } 239 } 240 } 241 for (final MessageReference node : removeList) { 242 dispatched.remove(node); 243 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 244 } 245 // this only happens after a reconnect - get an ack which is not 246 // valid 247 if (!callDispatchMatched) { 248 LOG.warn("Could not correlate acknowledgment with dispatched message: {}", ack); 249 } 250 } else if (ack.isIndividualAck()) { 251 // Message was delivered and acknowledge - but only delete the 252 // individual message 253 for (final MessageReference node : dispatched) { 254 MessageId messageId = node.getMessageId(); 255 if (ack.getLastMessageId().equals(messageId)) { 256 // Don't remove the nodes until we are committed - immediateAck option 257 if (!context.isInTransaction()) { 258 getSubscriptionStatistics().getDequeues().increment(); 259 ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); 260 dispatched.remove(node); 261 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 262 contractPrefetchExtension(1); 263 } else { 264 registerRemoveSync(context, node); 265 expandPrefetchExtension(1); 266 } 267 acknowledge(context, ack, node); 268 destination = (Destination) node.getRegionDestination(); 269 callDispatchMatched = true; 270 break; 271 } 272 } 273 } else if (ack.isDeliveredAck()) { 274 // Message was delivered but not acknowledged: update pre-fetch 275 // counters. 276 int index = 0; 277 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) { 278 final MessageReference node = iter.next(); 279 Destination nodeDest = (Destination) node.getRegionDestination(); 280 if (ack.getLastMessageId().equals(node.getMessageId())) { 281 expandPrefetchExtension(ack.getMessageCount()); 282 destination = nodeDest; 283 callDispatchMatched = true; 284 break; 285 } 286 } 287 if (!callDispatchMatched) { 288 throw new JMSException( 289 "Could not correlate acknowledgment with dispatched message: " 290 + ack); 291 } 292 } else if (ack.isExpiredAck()) { 293 // Message was expired 294 int index = 0; 295 boolean inAckRange = false; 296 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) { 297 final MessageReference node = iter.next(); 298 Destination nodeDest = (Destination) node.getRegionDestination(); 299 MessageId messageId = node.getMessageId(); 300 if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) { 301 inAckRange = true; 302 } 303 if (inAckRange) { 304 Destination regionDestination = nodeDest; 305 if (broker.isExpired(node)) { 306 regionDestination.messageExpired(context, this, node); 307 } 308 iter.remove(); 309 nodeDest.getDestinationStatistics().getInflight().decrement(); 310 311 if (ack.getLastMessageId().equals(messageId)) { 312 contractPrefetchExtension(1); 313 destination = (Destination) node.getRegionDestination(); 314 callDispatchMatched = true; 315 break; 316 } 317 } 318 } 319 if (!callDispatchMatched) { 320 throw new JMSException( 321 "Could not correlate expiration acknowledgment with dispatched message: " 322 + ack); 323 } 324 } else if (ack.isRedeliveredAck()) { 325 // Message was re-delivered but it was not yet considered to be 326 // a DLQ message. 327 boolean inAckRange = false; 328 for (final MessageReference node : dispatched) { 329 MessageId messageId = node.getMessageId(); 330 if (ack.getFirstMessageId() == null 331 || ack.getFirstMessageId().equals(messageId)) { 332 inAckRange = true; 333 } 334 if (inAckRange) { 335 if (ack.getLastMessageId().equals(messageId)) { 336 destination = (Destination) node.getRegionDestination(); 337 callDispatchMatched = true; 338 break; 339 } 340 } 341 } 342 if (!callDispatchMatched) { 343 throw new JMSException( 344 "Could not correlate acknowledgment with dispatched message: " 345 + ack); 346 } 347 } else if (ack.isPoisonAck()) { 348 // TODO: what if the message is already in a DLQ??? 349 // Handle the poison ACK case: we need to send the message to a 350 // DLQ 351 if (ack.isInTransaction()) { 352 throw new JMSException("Poison ack cannot be transacted: " 353 + ack); 354 } 355 int index = 0; 356 boolean inAckRange = false; 357 List<MessageReference> removeList = new ArrayList<MessageReference>(); 358 for (final MessageReference node : dispatched) { 359 MessageId messageId = node.getMessageId(); 360 if (ack.getFirstMessageId() == null 361 || ack.getFirstMessageId().equals(messageId)) { 362 inAckRange = true; 363 } 364 if (inAckRange) { 365 sendToDLQ(context, node, ack.getPoisonCause()); 366 Destination nodeDest = (Destination) node.getRegionDestination(); 367 nodeDest.getDestinationStatistics() 368 .getInflight().decrement(); 369 removeList.add(node); 370 getSubscriptionStatistics().getDequeues().increment(); 371 index++; 372 acknowledge(context, ack, node); 373 if (ack.getLastMessageId().equals(messageId)) { 374 contractPrefetchExtension(1); 375 destination = nodeDest; 376 callDispatchMatched = true; 377 break; 378 } 379 } 380 } 381 for (final MessageReference node : removeList) { 382 dispatched.remove(node); 383 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 384 } 385 if (!callDispatchMatched) { 386 throw new JMSException( 387 "Could not correlate acknowledgment with dispatched message: " 388 + ack); 389 } 390 } 391 } 392 if (callDispatchMatched && destination != null) { 393 destination.wakeup(); 394 dispatchPending(); 395 396 if (pending.isEmpty()) { 397 wakeupDestinationsForDispatch(); 398 } 399 } else { 400 LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack); 401 } 402 } 403 404 private void registerRemoveSync(ConnectionContext context, final MessageReference node) { 405 // setup a Synchronization to remove nodes from the 406 // dispatched list. 407 context.getTransaction().addSynchronization( 408 new Synchronization() { 409 410 @Override 411 public void afterCommit() 412 throws Exception { 413 Destination nodeDest = (Destination) node.getRegionDestination(); 414 synchronized (dispatchLock) { 415 getSubscriptionStatistics().getDequeues().increment(); 416 if (dispatched.remove(node)) { 417 // if consumer is removed, dispatched will be empty and inflight will 418 // already have been adjusted 419 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 420 nodeDest.getDestinationStatistics().getInflight().decrement(); 421 } 422 } 423 contractPrefetchExtension(1); 424 nodeDest.wakeup(); 425 dispatchPending(); 426 } 427 428 @Override 429 public void afterRollback() throws Exception { 430 contractPrefetchExtension(1); 431 } 432 }); 433 } 434 435 /** 436 * Checks an ack versus the contents of the dispatched list. 437 * called with dispatchLock held 438 * @param ack 439 * @throws JMSException if it does not match 440 */ 441 protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException { 442 MessageId firstAckedMsg = ack.getFirstMessageId(); 443 MessageId lastAckedMsg = ack.getLastMessageId(); 444 int checkCount = 0; 445 boolean checkFoundStart = false; 446 boolean checkFoundEnd = false; 447 for (MessageReference node : dispatched) { 448 449 if (firstAckedMsg == null) { 450 checkFoundStart = true; 451 } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) { 452 checkFoundStart = true; 453 } 454 455 if (checkFoundStart) { 456 checkCount++; 457 } 458 459 if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) { 460 checkFoundEnd = true; 461 break; 462 } 463 } 464 if (!checkFoundStart && firstAckedMsg != null) 465 throw new JMSException("Unmatched acknowledge: " + ack 466 + "; Could not find Message-ID " + firstAckedMsg 467 + " in dispatched-list (start of ack)"); 468 if (!checkFoundEnd && lastAckedMsg != null) 469 throw new JMSException("Unmatched acknowledge: " + ack 470 + "; Could not find Message-ID " + lastAckedMsg 471 + " in dispatched-list (end of ack)"); 472 if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) { 473 throw new JMSException("Unmatched acknowledge: " + ack 474 + "; Expected message count (" + ack.getMessageCount() 475 + ") differs from count in dispatched-list (" + checkCount 476 + ")"); 477 } 478 } 479 480 /** 481 * 482 * @param context 483 * @param node 484 * @param poisonCause 485 * @throws IOException 486 * @throws Exception 487 */ 488 protected void sendToDLQ(final ConnectionContext context, final MessageReference node, Throwable poisonCause) throws IOException, Exception { 489 broker.getRoot().sendToDeadLetterQueue(context, node, this, poisonCause); 490 } 491 492 @Override 493 public int getInFlightSize() { 494 return dispatched.size(); 495 } 496 497 /** 498 * Used to determine if the broker can dispatch to the consumer. 499 * 500 * @return 501 */ 502 @Override 503 public boolean isFull() { 504 return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize(); 505 } 506 507 /** 508 * @return true when 60% or more room is left for dispatching messages 509 */ 510 @Override 511 public boolean isLowWaterMark() { 512 return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4); 513 } 514 515 /** 516 * @return true when 10% or less room is left for dispatching messages 517 */ 518 @Override 519 public boolean isHighWaterMark() { 520 return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9); 521 } 522 523 @Override 524 public int countBeforeFull() { 525 return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - dispatched.size(); 526 } 527 528 @Override 529 public int getPendingQueueSize() { 530 return pending.size(); 531 } 532 533 @Override 534 public int getDispatchedQueueSize() { 535 return dispatched.size(); 536 } 537 538 @Override 539 public long getDequeueCounter() { 540 return getSubscriptionStatistics().getDequeues().getCount(); 541 } 542 543 @Override 544 public long getDispatchedCounter() { 545 return getSubscriptionStatistics().getDispatched().getCount(); 546 } 547 548 @Override 549 public long getEnqueueCounter() { 550 return getSubscriptionStatistics().getEnqueues().getCount(); 551 } 552 553 @Override 554 public boolean isRecoveryRequired() { 555 return pending.isRecoveryRequired(); 556 } 557 558 public PendingMessageCursor getPending() { 559 return this.pending; 560 } 561 562 public void setPending(PendingMessageCursor pending) { 563 this.pending = pending; 564 if (this.pending!=null) { 565 this.pending.setSystemUsage(usageManager); 566 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 567 } 568 } 569 570 @Override 571 public void add(ConnectionContext context, Destination destination) throws Exception { 572 synchronized(pendingLock) { 573 super.add(context, destination); 574 pending.add(context, destination); 575 } 576 } 577 578 @Override 579 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 580 return remove(context, destination, dispatched); 581 } 582 583 public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception { 584 LinkedList<MessageReference> redispatch = new LinkedList<MessageReference>(); 585 synchronized(pendingLock) { 586 super.remove(context, destination); 587 // Here is a potential problem concerning Inflight stat: 588 // Messages not already committed or rolled back may not be removed from dispatched list at the moment 589 // Except if each commit or rollback callback action comes before remove of subscriber. 590 redispatch.addAll(pending.remove(context, destination)); 591 592 if (dispatched == null) { 593 return redispatch; 594 } 595 596 // Synchronized to DispatchLock if necessary 597 if (dispatched == this.dispatched) { 598 synchronized(dispatchLock) { 599 addReferencesAndUpdateRedispatch(redispatch, destination, dispatched); 600 } 601 } else { 602 addReferencesAndUpdateRedispatch(redispatch, destination, dispatched); 603 } 604 } 605 606 return redispatch; 607 } 608 609 private void addReferencesAndUpdateRedispatch(LinkedList<MessageReference> redispatch, Destination destination, List<MessageReference> dispatched) { 610 ArrayList<MessageReference> references = new ArrayList<MessageReference>(); 611 for (MessageReference r : dispatched) { 612 if (r.getRegionDestination() == destination) { 613 references.add(r); 614 getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize()); 615 } 616 } 617 redispatch.addAll(0, references); 618 destination.getDestinationStatistics().getInflight().subtract(references.size()); 619 dispatched.removeAll(references); 620 } 621 622 // made public so it can be used in MQTTProtocolConverter 623 public void dispatchPending() throws IOException { 624 List<Destination> slowConsumerTargets = null; 625 626 synchronized(pendingLock) { 627 try { 628 int numberToDispatch = countBeforeFull(); 629 if (numberToDispatch > 0) { 630 setSlowConsumer(false); 631 setPendingBatchSize(pending, numberToDispatch); 632 int count = 0; 633 pending.reset(); 634 while (count < numberToDispatch && !isFull() && pending.hasNext()) { 635 MessageReference node = pending.next(); 636 if (node == null) { 637 break; 638 } 639 640 // Synchronize between dispatched list and remove of message from pending list 641 // related to remove subscription action 642 synchronized(dispatchLock) { 643 pending.remove(); 644 if (!isDropped(node) && canDispatch(node)) { 645 646 // Message may have been sitting in the pending 647 // list a while waiting for the consumer to ak the message. 648 if (node != QueueMessageReference.NULL_MESSAGE && node.isExpired()) { 649 //increment number to dispatch 650 numberToDispatch++; 651 if (broker.isExpired(node)) { 652 ((Destination)node.getRegionDestination()).messageExpired(context, this, node); 653 } 654 655 if (!isBrowser()) { 656 node.decrementReferenceCount(); 657 continue; 658 } 659 } 660 dispatch(node); 661 count++; 662 } 663 } 664 // decrement after dispatch has taken ownership to avoid usage jitter 665 node.decrementReferenceCount(); 666 } 667 } else if (!isSlowConsumer()) { 668 setSlowConsumer(true); 669 slowConsumerTargets = destinations; 670 } 671 } finally { 672 pending.release(); 673 } 674 } 675 676 if (slowConsumerTargets != null) { 677 for (Destination dest : slowConsumerTargets) { 678 dest.slowConsumer(context, this); 679 } 680 } 681 } 682 683 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) { 684 pending.setMaxBatchSize(numberToDispatch); 685 } 686 687 // called with dispatchLock held 688 protected boolean dispatch(final MessageReference node) throws IOException { 689 final Message message = node.getMessage(); 690 if (message == null) { 691 return false; 692 } 693 694 okForAckAsDispatchDone.countDown(); 695 696 MessageDispatch md = createMessageDispatch(node, message); 697 if (node != QueueMessageReference.NULL_MESSAGE) { 698 getSubscriptionStatistics().getDispatched().increment(); 699 dispatched.add(node); 700 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); 701 } 702 if (getPrefetchSize() == 0) { 703 while (true) { 704 int currentExtension = prefetchExtension.get(); 705 int newExtension = Math.max(0, currentExtension - 1); 706 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 707 break; 708 } 709 } 710 } 711 if (info.isDispatchAsync()) { 712 md.setTransmitCallback(new TransmitCallback() { 713 714 @Override 715 public void onSuccess() { 716 // Since the message gets queued up in async dispatch, we don't want to 717 // decrease the reference count until it gets put on the wire. 718 onDispatch(node, message); 719 } 720 721 @Override 722 public void onFailure() { 723 Destination nodeDest = (Destination) node.getRegionDestination(); 724 if (nodeDest != null) { 725 if (node != QueueMessageReference.NULL_MESSAGE) { 726 nodeDest.getDestinationStatistics().getDispatched().increment(); 727 nodeDest.getDestinationStatistics().getInflight().increment(); 728 LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() }); 729 } 730 } 731 if (node instanceof QueueMessageReference) { 732 ((QueueMessageReference) node).unlock(); 733 } 734 } 735 }); 736 context.getConnection().dispatchAsync(md); 737 } else { 738 context.getConnection().dispatchSync(md); 739 onDispatch(node, message); 740 } 741 return true; 742 } 743 744 protected void onDispatch(final MessageReference node, final Message message) { 745 Destination nodeDest = (Destination) node.getRegionDestination(); 746 if (nodeDest != null) { 747 if (node != QueueMessageReference.NULL_MESSAGE) { 748 nodeDest.getDestinationStatistics().getDispatched().increment(); 749 nodeDest.getDestinationStatistics().getInflight().increment(); 750 LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() }); 751 } 752 } 753 754 if (info.isDispatchAsync()) { 755 try { 756 dispatchPending(); 757 } catch (IOException e) { 758 context.getConnection().serviceExceptionAsync(e); 759 } 760 } 761 } 762 763 /** 764 * inform the MessageConsumer on the client to change it's prefetch 765 * 766 * @param newPrefetch 767 */ 768 @Override 769 public void updateConsumerPrefetch(int newPrefetch) { 770 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { 771 ConsumerControl cc = new ConsumerControl(); 772 cc.setConsumerId(info.getConsumerId()); 773 cc.setPrefetch(newPrefetch); 774 context.getConnection().dispatchAsync(cc); 775 } 776 } 777 778 /** 779 * @param node 780 * @param message 781 * @return MessageDispatch 782 */ 783 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { 784 MessageDispatch md = new MessageDispatch(); 785 md.setConsumerId(info.getConsumerId()); 786 787 if (node == QueueMessageReference.NULL_MESSAGE) { 788 md.setMessage(null); 789 md.setDestination(null); 790 } else { 791 Destination regionDestination = (Destination) node.getRegionDestination(); 792 md.setDestination(regionDestination.getActiveMQDestination()); 793 md.setMessage(message); 794 md.setRedeliveryCounter(node.getRedeliveryCounter()); 795 } 796 797 return md; 798 } 799 800 /** 801 * Use when a matched message is about to be dispatched to the client. 802 * 803 * @param node 804 * @return false if the message should not be dispatched to the client 805 * (another sub may have already dispatched it for example). 806 * @throws IOException 807 */ 808 protected abstract boolean canDispatch(MessageReference node) throws IOException; 809 810 protected abstract boolean isDropped(MessageReference node); 811 812 /** 813 * Used during acknowledgment to remove the message. 814 * 815 * @throws IOException 816 */ 817 protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException; 818 819 820 public int getMaxProducersToAudit() { 821 return maxProducersToAudit; 822 } 823 824 public void setMaxProducersToAudit(int maxProducersToAudit) { 825 this.maxProducersToAudit = maxProducersToAudit; 826 if (this.pending != null) { 827 this.pending.setMaxProducersToAudit(maxProducersToAudit); 828 } 829 } 830 831 public int getMaxAuditDepth() { 832 return maxAuditDepth; 833 } 834 835 public void setMaxAuditDepth(int maxAuditDepth) { 836 this.maxAuditDepth = maxAuditDepth; 837 if (this.pending != null) { 838 this.pending.setMaxAuditDepth(maxAuditDepth); 839 } 840 } 841 842 @Override 843 public void setPrefetchSize(int prefetchSize) { 844 this.info.setPrefetchSize(prefetchSize); 845 try { 846 this.dispatchPending(); 847 } catch (Exception e) { 848 LOG.trace("Caught exception during dispatch after prefetch change.", e); 849 } 850 } 851}