001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.HashMap; 023import java.util.Iterator; 024import java.util.LinkedList; 025import java.util.List; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.concurrent.atomic.AtomicReference; 033 034import javax.jms.IllegalStateException; 035import javax.jms.InvalidDestinationException; 036import javax.jms.JMSException; 037import javax.jms.Message; 038import javax.jms.MessageConsumer; 039import javax.jms.MessageListener; 040import javax.jms.TransactionRolledBackException; 041 042import org.apache.activemq.blob.BlobDownloader; 043import org.apache.activemq.command.*; 044import org.apache.activemq.management.JMSConsumerStatsImpl; 045import org.apache.activemq.management.StatsCapable; 046import org.apache.activemq.management.StatsImpl; 047import org.apache.activemq.selector.SelectorParser; 048import org.apache.activemq.transaction.Synchronization; 049import org.apache.activemq.util.Callback; 050import org.apache.activemq.util.IntrospectionSupport; 051import org.apache.activemq.util.JMSExceptionSupport; 052import org.apache.activemq.util.ThreadPoolUtils; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 058 * from a destination. A <CODE> MessageConsumer</CODE> object is created by 059 * passing a <CODE>Destination</CODE> object to a message-consumer creation 060 * method supplied by a session. 061 * <P> 062 * <CODE>MessageConsumer</CODE> is the parent interface for all message 063 * consumers. 064 * <P> 065 * A message consumer can be created with a message selector. A message selector 066 * allows the client to restrict the messages delivered to the message consumer 067 * to those that match the selector. 068 * <P> 069 * A client may either synchronously receive a message consumer's messages or 070 * have the consumer asynchronously deliver them as they arrive. 071 * <P> 072 * For synchronous receipt, a client can request the next message from a message 073 * consumer using one of its <CODE> receive</CODE> methods. There are several 074 * variations of <CODE>receive</CODE> that allow a client to poll or wait for 075 * the next message. 076 * <P> 077 * For asynchronous delivery, a client can register a 078 * <CODE>MessageListener</CODE> object with a message consumer. As messages 079 * arrive at the message consumer, it delivers them by calling the 080 * <CODE>MessageListener</CODE>'s<CODE> 081 * onMessage</CODE> method. 082 * <P> 083 * It is a client programming error for a <CODE>MessageListener</CODE> to 084 * throw an exception. 085 * 086 * 087 * @see javax.jms.MessageConsumer 088 * @see javax.jms.QueueReceiver 089 * @see javax.jms.TopicSubscriber 090 * @see javax.jms.Session 091 */ 092public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher { 093 094 @SuppressWarnings("serial") 095 class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> { 096 final TransactionId transactionId; 097 public PreviouslyDeliveredMap(TransactionId transactionId) { 098 this.transactionId = transactionId; 099 } 100 } 101 102 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class); 103 protected final ActiveMQSession session; 104 protected final ConsumerInfo info; 105 106 // These are the messages waiting to be delivered to the client 107 protected final MessageDispatchChannel unconsumedMessages; 108 109 // The are the messages that were delivered to the consumer but that have 110 // not been acknowledged. It's kept in reverse order since we 111 // Always walk list in reverse order. 112 protected final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>(); 113 // track duplicate deliveries in a transaction such that the tx integrity can be validated 114 private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages; 115 private int deliveredCounter; 116 private int additionalWindowSize; 117 private long redeliveryDelay; 118 private int ackCounter; 119 private int dispatchedCount; 120 private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>(); 121 private final JMSConsumerStatsImpl stats; 122 123 private final String selector; 124 private boolean synchronizationRegistered; 125 private final AtomicBoolean started = new AtomicBoolean(false); 126 127 private MessageAvailableListener availableListener; 128 129 private RedeliveryPolicy redeliveryPolicy; 130 private boolean optimizeAcknowledge; 131 private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); 132 private ExecutorService executorService; 133 private MessageTransformer transformer; 134 private boolean clearDeliveredList; 135 AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0); 136 137 private MessageAck pendingAck; 138 private long lastDeliveredSequenceId = -1; 139 140 private IOException failureError; 141 142 private long optimizeAckTimestamp = System.currentTimeMillis(); 143 private long optimizeAcknowledgeTimeOut = 0; 144 private long optimizedAckScheduledAckInterval = 0; 145 private Runnable optimizedAckTask; 146 private long failoverRedeliveryWaitPeriod = 0; 147 private boolean transactedIndividualAck = false; 148 private boolean nonBlockingRedelivery = false; 149 private boolean consumerExpiryCheckEnabled = true; 150 151 /** 152 * Create a MessageConsumer 153 * 154 * @param session 155 * @param dest 156 * @param name 157 * @param selector 158 * @param prefetch 159 * @param maximumPendingMessageCount 160 * @param noLocal 161 * @param browser 162 * @param dispatchAsync 163 * @param messageListener 164 * @throws JMSException 165 */ 166 public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, 167 String name, String selector, int prefetch, 168 int maximumPendingMessageCount, boolean noLocal, boolean browser, 169 boolean dispatchAsync, MessageListener messageListener) throws JMSException { 170 if (dest == null) { 171 throw new InvalidDestinationException("Don't understand null destinations"); 172 } else if (dest.getPhysicalName() == null) { 173 throw new InvalidDestinationException("The destination object was not given a physical name."); 174 } else if (dest.isTemporary()) { 175 String physicalName = dest.getPhysicalName(); 176 177 if (physicalName == null) { 178 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest); 179 } 180 181 String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue(); 182 183 if (physicalName.indexOf(connectionID) < 0) { 184 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection"); 185 } 186 187 if (session.connection.isDeleted(dest)) { 188 throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted"); 189 } 190 if (prefetch < 0) { 191 throw new JMSException("Cannot have a prefetch size less than zero"); 192 } 193 } 194 if (session.connection.isMessagePrioritySupported()) { 195 this.unconsumedMessages = new SimplePriorityMessageDispatchChannel(); 196 }else { 197 this.unconsumedMessages = new FifoMessageDispatchChannel(); 198 } 199 200 this.session = session; 201 this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest); 202 if (this.redeliveryPolicy == null) { 203 this.redeliveryPolicy = new RedeliveryPolicy(); 204 } 205 setTransformer(session.getTransformer()); 206 207 this.info = new ConsumerInfo(consumerId); 208 this.info.setExclusive(this.session.connection.isExclusiveConsumer()); 209 this.info.setClientId(this.session.connection.getClientID()); 210 this.info.setSubscriptionName(name); 211 this.info.setPrefetchSize(prefetch); 212 this.info.setCurrentPrefetchSize(prefetch); 213 this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount); 214 this.info.setNoLocal(noLocal); 215 this.info.setDispatchAsync(dispatchAsync); 216 this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer()); 217 this.info.setSelector(null); 218 219 // Allows the options on the destination to configure the consumerInfo 220 if (dest.getOptions() != null) { 221 Map<String, Object> options = IntrospectionSupport.extractProperties( 222 new HashMap<String, Object>(dest.getOptions()), "consumer."); 223 IntrospectionSupport.setProperties(this.info, options); 224 if (options.size() > 0) { 225 String msg = "There are " + options.size() 226 + " consumer options that couldn't be set on the consumer." 227 + " Check the options are spelled correctly." 228 + " Unknown parameters=[" + options + "]." 229 + " This consumer cannot be started."; 230 LOG.warn(msg); 231 throw new ConfigurationException(msg); 232 } 233 } 234 235 this.info.setDestination(dest); 236 this.info.setBrowser(browser); 237 if (selector != null && selector.trim().length() != 0) { 238 // Validate the selector 239 SelectorParser.parse(selector); 240 this.info.setSelector(selector); 241 this.selector = selector; 242 } else if (info.getSelector() != null) { 243 // Validate the selector 244 SelectorParser.parse(this.info.getSelector()); 245 this.selector = this.info.getSelector(); 246 } else { 247 this.selector = null; 248 } 249 250 this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest); 251 this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() 252 && !info.isBrowser(); 253 if (this.optimizeAcknowledge) { 254 this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut(); 255 setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval()); 256 } 257 258 this.info.setOptimizedAcknowledge(this.optimizeAcknowledge); 259 this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod(); 260 this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery(); 261 this.transactedIndividualAck = session.connection.isTransactedIndividualAck() 262 || this.nonBlockingRedelivery 263 || session.connection.isMessagePrioritySupported(); 264 this.consumerExpiryCheckEnabled = session.connection.isConsumerExpiryCheckEnabled(); 265 if (messageListener != null) { 266 setMessageListener(messageListener); 267 } 268 try { 269 this.session.addConsumer(this); 270 this.session.syncSendPacket(info); 271 } catch (JMSException e) { 272 this.session.removeConsumer(this); 273 throw e; 274 } 275 276 if (session.connection.isStarted()) { 277 start(); 278 } 279 } 280 281 private boolean isAutoAcknowledgeEach() { 282 return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() ); 283 } 284 285 private boolean isAutoAcknowledgeBatch() { 286 return session.isDupsOkAcknowledge() && !getDestination().isQueue() ; 287 } 288 289 @Override 290 public StatsImpl getStats() { 291 return stats; 292 } 293 294 public JMSConsumerStatsImpl getConsumerStats() { 295 return stats; 296 } 297 298 public RedeliveryPolicy getRedeliveryPolicy() { 299 return redeliveryPolicy; 300 } 301 302 /** 303 * Sets the redelivery policy used when messages are redelivered 304 */ 305 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 306 this.redeliveryPolicy = redeliveryPolicy; 307 } 308 309 public MessageTransformer getTransformer() { 310 return transformer; 311 } 312 313 /** 314 * Sets the transformer used to transform messages before they are sent on 315 * to the JMS bus 316 */ 317 public void setTransformer(MessageTransformer transformer) { 318 this.transformer = transformer; 319 } 320 321 /** 322 * @return Returns the value. 323 */ 324 public ConsumerId getConsumerId() { 325 return info.getConsumerId(); 326 } 327 328 /** 329 * @return the consumer name - used for durable consumers 330 */ 331 public String getConsumerName() { 332 return this.info.getSubscriptionName(); 333 } 334 335 /** 336 * @return true if this consumer does not accept locally produced messages 337 */ 338 protected boolean isNoLocal() { 339 return info.isNoLocal(); 340 } 341 342 /** 343 * Retrieve is a browser 344 * 345 * @return true if a browser 346 */ 347 protected boolean isBrowser() { 348 return info.isBrowser(); 349 } 350 351 /** 352 * @return ActiveMQDestination 353 */ 354 protected ActiveMQDestination getDestination() { 355 return info.getDestination(); 356 } 357 358 /** 359 * @return Returns the prefetchNumber. 360 */ 361 public int getPrefetchNumber() { 362 return info.getPrefetchSize(); 363 } 364 365 /** 366 * @return true if this is a durable topic subscriber 367 */ 368 public boolean isDurableSubscriber() { 369 return info.getSubscriptionName() != null && info.getDestination().isTopic(); 370 } 371 372 /** 373 * Gets this message consumer's message selector expression. 374 * 375 * @return this message consumer's message selector, or null if no message 376 * selector exists for the message consumer (that is, if the message 377 * selector was not set or was set to null or the empty string) 378 * @throws JMSException if the JMS provider fails to receive the next 379 * message due to some internal error. 380 */ 381 @Override 382 public String getMessageSelector() throws JMSException { 383 checkClosed(); 384 return selector; 385 } 386 387 /** 388 * Gets the message consumer's <CODE>MessageListener</CODE>. 389 * 390 * @return the listener for the message consumer, or null if no listener is 391 * set 392 * @throws JMSException if the JMS provider fails to get the message 393 * listener due to some internal error. 394 * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener) 395 */ 396 @Override 397 public MessageListener getMessageListener() throws JMSException { 398 checkClosed(); 399 return this.messageListener.get(); 400 } 401 402 /** 403 * Sets the message consumer's <CODE>MessageListener</CODE>. 404 * <P> 405 * Setting the message listener to null is the equivalent of unsetting the 406 * message listener for the message consumer. 407 * <P> 408 * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> 409 * while messages are being consumed by an existing listener or the consumer 410 * is being used to consume messages synchronously is undefined. 411 * 412 * @param listener the listener to which the messages are to be delivered 413 * @throws JMSException if the JMS provider fails to receive the next 414 * message due to some internal error. 415 * @see javax.jms.MessageConsumer#getMessageListener 416 */ 417 @Override 418 public void setMessageListener(MessageListener listener) throws JMSException { 419 checkClosed(); 420 if (info.getPrefetchSize() == 0) { 421 throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); 422 } 423 if (listener != null) { 424 boolean wasRunning = session.isRunning(); 425 if (wasRunning) { 426 session.stop(); 427 } 428 429 this.messageListener.set(listener); 430 session.redispatch(this, unconsumedMessages); 431 432 if (wasRunning) { 433 session.start(); 434 } 435 } else { 436 this.messageListener.set(null); 437 } 438 } 439 440 @Override 441 public MessageAvailableListener getAvailableListener() { 442 return availableListener; 443 } 444 445 /** 446 * Sets the listener used to notify synchronous consumers that there is a 447 * message available so that the {@link MessageConsumer#receiveNoWait()} can 448 * be called. 449 */ 450 @Override 451 public void setAvailableListener(MessageAvailableListener availableListener) { 452 this.availableListener = availableListener; 453 } 454 455 /** 456 * Used to get an enqueued message from the unconsumedMessages list. The 457 * amount of time this method blocks is based on the timeout value. - if 458 * timeout==-1 then it blocks until a message is received. - if timeout==0 459 * then it it tries to not block at all, it returns a message if it is 460 * available - if timeout>0 then it blocks up to timeout amount of time. 461 * Expired messages will consumed by this method. 462 * 463 * @throws JMSException 464 * @return null if we timeout or if the consumer is closed. 465 */ 466 private MessageDispatch dequeue(long timeout) throws JMSException { 467 try { 468 long deadline = 0; 469 if (timeout > 0) { 470 deadline = System.currentTimeMillis() + timeout; 471 } 472 while (true) { 473 MessageDispatch md = unconsumedMessages.dequeue(timeout); 474 if (md == null) { 475 if (timeout > 0 && !unconsumedMessages.isClosed()) { 476 timeout = Math.max(deadline - System.currentTimeMillis(), 0); 477 } else { 478 if (failureError != null) { 479 throw JMSExceptionSupport.create(failureError); 480 } else { 481 return null; 482 } 483 } 484 } else if (md.getMessage() == null) { 485 return null; 486 } else if (consumeExpiredMessage(md)) { 487 LOG.debug("{} received expired message: {}", getConsumerId(), md); 488 beforeMessageIsConsumed(md); 489 afterMessageIsConsumed(md, true); 490 if (timeout > 0) { 491 timeout = Math.max(deadline - System.currentTimeMillis(), 0); 492 } 493 sendPullCommand(timeout); 494 } else if (redeliveryExceeded(md)) { 495 LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md); 496 posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); 497 if (timeout > 0) { 498 timeout = Math.max(deadline - System.currentTimeMillis(), 0); 499 } 500 sendPullCommand(timeout); 501 } else { 502 if (LOG.isTraceEnabled()) { 503 LOG.trace(getConsumerId() + " received message: " + md); 504 } 505 return md; 506 } 507 } 508 } catch (InterruptedException e) { 509 Thread.currentThread().interrupt(); 510 throw JMSExceptionSupport.create(e); 511 } 512 } 513 514 private boolean consumeExpiredMessage(MessageDispatch dispatch) { 515 return isConsumerExpiryCheckEnabled() && dispatch.getMessage().isExpired(); 516 } 517 518 private void posionAck(MessageDispatch md, String cause) throws JMSException { 519 MessageAck posionAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 520 posionAck.setFirstMessageId(md.getMessage().getMessageId()); 521 posionAck.setPoisonCause(new Throwable(cause)); 522 session.sendAck(posionAck); 523 } 524 525 private boolean redeliveryExceeded(MessageDispatch md) { 526 try { 527 return session.getTransacted() 528 && redeliveryPolicy != null 529 && redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 530 && md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries() 531 // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin 532 && md.getMessage().getProperty("redeliveryDelay") == null; 533 } catch (Exception ignored) { 534 return false; 535 } 536 } 537 538 /** 539 * Receives the next message produced for this message consumer. 540 * <P> 541 * This call blocks indefinitely until a message is produced or until this 542 * message consumer is closed. 543 * <P> 544 * If this <CODE>receive</CODE> is done within a transaction, the consumer 545 * retains the message until the transaction commits. 546 * 547 * @return the next message produced for this message consumer, or null if 548 * this message consumer is concurrently closed 549 */ 550 @Override 551 public Message receive() throws JMSException { 552 checkClosed(); 553 checkMessageListener(); 554 555 sendPullCommand(0); 556 MessageDispatch md = dequeue(-1); 557 if (md == null) { 558 return null; 559 } 560 561 beforeMessageIsConsumed(md); 562 afterMessageIsConsumed(md, false); 563 564 return createActiveMQMessage(md); 565 } 566 567 /** 568 * @param md 569 * @return 570 */ 571 private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException { 572 ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy(); 573 if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) { 574 ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy())); 575 } 576 if (m.getDataStructureType() == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) { 577 ((ActiveMQObjectMessage)m).setTrustAllPackages(session.getConnection().isTrustAllPackages()); 578 ((ActiveMQObjectMessage)m).setTrustedPackages(session.getConnection().getTrustedPackages()); 579 } 580 if (transformer != null) { 581 Message transformedMessage = transformer.consumerTransform(session, this, m); 582 if (transformedMessage != null) { 583 m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection); 584 } 585 } 586 if (session.isClientAcknowledge()) { 587 m.setAcknowledgeCallback(new Callback() { 588 @Override 589 public void execute() throws Exception { 590 session.checkClosed(); 591 session.acknowledge(); 592 } 593 }); 594 } else if (session.isIndividualAcknowledge()) { 595 m.setAcknowledgeCallback(new Callback() { 596 @Override 597 public void execute() throws Exception { 598 session.checkClosed(); 599 acknowledge(md); 600 } 601 }); 602 } 603 return m; 604 } 605 606 /** 607 * Receives the next message that arrives within the specified timeout 608 * interval. 609 * <P> 610 * This call blocks until a message arrives, the timeout expires, or this 611 * message consumer is closed. A <CODE>timeout</CODE> of zero never 612 * expires, and the call blocks indefinitely. 613 * 614 * @param timeout the timeout value (in milliseconds), a time out of zero 615 * never expires. 616 * @return the next message produced for this message consumer, or null if 617 * the timeout expires or this message consumer is concurrently 618 * closed 619 */ 620 @Override 621 public Message receive(long timeout) throws JMSException { 622 checkClosed(); 623 checkMessageListener(); 624 if (timeout == 0) { 625 return this.receive(); 626 } 627 628 sendPullCommand(timeout); 629 while (timeout > 0) { 630 631 MessageDispatch md; 632 if (info.getPrefetchSize() == 0) { 633 md = dequeue(-1); // We let the broker let us know when we timeout. 634 } else { 635 md = dequeue(timeout); 636 } 637 638 if (md == null) { 639 return null; 640 } 641 642 beforeMessageIsConsumed(md); 643 afterMessageIsConsumed(md, false); 644 return createActiveMQMessage(md); 645 } 646 return null; 647 } 648 649 /** 650 * Receives the next message if one is immediately available. 651 * 652 * @return the next message produced for this message consumer, or null if 653 * one is not available 654 * @throws JMSException if the JMS provider fails to receive the next 655 * message due to some internal error. 656 */ 657 @Override 658 public Message receiveNoWait() throws JMSException { 659 checkClosed(); 660 checkMessageListener(); 661 sendPullCommand(-1); 662 663 MessageDispatch md; 664 if (info.getPrefetchSize() == 0) { 665 md = dequeue(-1); // We let the broker let us know when we 666 // timeout. 667 } else { 668 md = dequeue(0); 669 } 670 671 if (md == null) { 672 return null; 673 } 674 675 beforeMessageIsConsumed(md); 676 afterMessageIsConsumed(md, false); 677 return createActiveMQMessage(md); 678 } 679 680 /** 681 * Closes the message consumer. 682 * <P> 683 * Since a provider may allocate some resources on behalf of a <CODE> 684 * MessageConsumer</CODE> 685 * outside the Java virtual machine, clients should close them when they are 686 * not needed. Relying on garbage collection to eventually reclaim these 687 * resources may not be timely enough. 688 * <P> 689 * This call blocks until a <CODE>receive</CODE> or message listener in 690 * progress has completed. A blocked message consumer <CODE>receive </CODE> 691 * call returns null when this message consumer is closed. 692 * 693 * @throws JMSException if the JMS provider fails to close the consumer due 694 * to some internal error. 695 */ 696 @Override 697 public void close() throws JMSException { 698 if (!unconsumedMessages.isClosed()) { 699 if (!deliveredMessages.isEmpty() && session.getTransactionContext().isInTransaction()) { 700 session.getTransactionContext().addSynchronization(new Synchronization() { 701 @Override 702 public void afterCommit() throws Exception { 703 doClose(); 704 } 705 706 @Override 707 public void afterRollback() throws Exception { 708 doClose(); 709 } 710 }); 711 } else { 712 doClose(); 713 } 714 } 715 } 716 717 void doClose() throws JMSException { 718 dispose(); 719 RemoveInfo removeCommand = info.createRemoveCommand(); 720 LOG.debug("remove: {}, lastDeliveredSequenceId: {}", getConsumerId(), lastDeliveredSequenceId); 721 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 722 this.session.asyncSendPacket(removeCommand); 723 } 724 725 void inProgressClearRequired() { 726 inProgressClearRequiredFlag.incrementAndGet(); 727 // deal with delivered messages async to avoid lock contention with in progress acks 728 clearDeliveredList = true; 729 // force a rollback if we will be acking in a transaction after/during failover 730 // bc acks are async they may not get there reliably on reconnect and the consumer 731 // may not be aware of the reconnect in a timely fashion if in onMessage 732 if (!deliveredMessages.isEmpty() && session.getTransactionContext().isInTransaction()) { 733 session.getTransactionContext().setRollbackOnly(true); 734 } 735 } 736 737 void clearMessagesInProgress() { 738 if (inProgressClearRequiredFlag.get() > 0) { 739 synchronized (unconsumedMessages.getMutex()) { 740 if (inProgressClearRequiredFlag.get() > 0) { 741 LOG.debug("{} clearing unconsumed list ({}) on transport interrupt", getConsumerId(), unconsumedMessages.size()); 742 // ensure unconsumed are rolledback up front as they may get redelivered to another consumer 743 List<MessageDispatch> list = unconsumedMessages.removeAll(); 744 if (!this.info.isBrowser()) { 745 for (MessageDispatch old : list) { 746 session.connection.rollbackDuplicate(this, old.getMessage()); 747 } 748 } 749 // allow dispatch on this connection to resume 750 session.connection.transportInterruptionProcessingComplete(); 751 inProgressClearRequiredFlag.decrementAndGet(); 752 753 // Wake up any blockers and allow them to recheck state. 754 unconsumedMessages.getMutex().notifyAll(); 755 } 756 } 757 } 758 clearDeliveredList(); 759 } 760 761 void deliverAcks() { 762 MessageAck ack = null; 763 if (deliveryingAcknowledgements.compareAndSet(false, true)) { 764 if (isAutoAcknowledgeEach()) { 765 synchronized(deliveredMessages) { 766 ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 767 if (ack != null) { 768 deliveredMessages.clear(); 769 ackCounter = 0; 770 } else { 771 ack = pendingAck; 772 pendingAck = null; 773 } 774 } 775 } else if (pendingAck != null && pendingAck.isStandardAck()) { 776 ack = pendingAck; 777 pendingAck = null; 778 } 779 if (ack != null) { 780 final MessageAck ackToSend = ack; 781 782 if (executorService == null) { 783 executorService = Executors.newSingleThreadExecutor(); 784 } 785 executorService.submit(new Runnable() { 786 @Override 787 public void run() { 788 try { 789 session.sendAck(ackToSend,true); 790 } catch (JMSException e) { 791 LOG.error(getConsumerId() + " failed to delivered acknowledgements", e); 792 } finally { 793 deliveryingAcknowledgements.set(false); 794 } 795 } 796 }); 797 } else { 798 deliveryingAcknowledgements.set(false); 799 } 800 } 801 } 802 803 public void dispose() throws JMSException { 804 if (!unconsumedMessages.isClosed()) { 805 806 // Do we have any acks we need to send out before closing? 807 // Ack any delivered messages now. 808 if (!session.getTransacted()) { 809 deliverAcks(); 810 if (isAutoAcknowledgeBatch()) { 811 acknowledge(); 812 } 813 } 814 if (executorService != null) { 815 ThreadPoolUtils.shutdownGraceful(executorService, 60000L); 816 executorService = null; 817 } 818 if (optimizedAckTask != null) { 819 this.session.connection.getScheduler().cancel(optimizedAckTask); 820 optimizedAckTask = null; 821 } 822 823 if (session.isClientAcknowledge()) { 824 if (!this.info.isBrowser()) { 825 // rollback duplicates that aren't acknowledged 826 List<MessageDispatch> tmp = null; 827 synchronized (this.deliveredMessages) { 828 tmp = new ArrayList<MessageDispatch>(this.deliveredMessages); 829 } 830 for (MessageDispatch old : tmp) { 831 this.session.connection.rollbackDuplicate(this, old.getMessage()); 832 } 833 tmp.clear(); 834 } 835 } 836 if (!session.isTransacted()) { 837 synchronized(deliveredMessages) { 838 deliveredMessages.clear(); 839 } 840 } 841 unconsumedMessages.close(); 842 this.session.removeConsumer(this); 843 List<MessageDispatch> list = unconsumedMessages.removeAll(); 844 if (!this.info.isBrowser()) { 845 for (MessageDispatch old : list) { 846 // ensure we don't filter this as a duplicate 847 LOG.debug("on close, rollback duplicate: {}", old.getMessage().getMessageId()); 848 session.connection.rollbackDuplicate(this, old.getMessage()); 849 } 850 } 851 } 852 } 853 854 /** 855 * @throws IllegalStateException 856 */ 857 protected void checkClosed() throws IllegalStateException { 858 if (unconsumedMessages.isClosed()) { 859 throw new IllegalStateException("The Consumer is closed"); 860 } 861 } 862 863 /** 864 * If we have a zero prefetch specified then send a pull command to the 865 * broker to pull a message we are about to receive 866 */ 867 protected void sendPullCommand(long timeout) throws JMSException { 868 clearDeliveredList(); 869 if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { 870 MessagePull messagePull = new MessagePull(); 871 messagePull.configure(info); 872 messagePull.setTimeout(timeout); 873 session.asyncSendPacket(messagePull); 874 } 875 } 876 877 protected void checkMessageListener() throws JMSException { 878 session.checkMessageListener(); 879 } 880 881 protected void setOptimizeAcknowledge(boolean value) { 882 if (optimizeAcknowledge && !value) { 883 deliverAcks(); 884 } 885 optimizeAcknowledge = value; 886 } 887 888 protected void setPrefetchSize(int prefetch) { 889 deliverAcks(); 890 this.info.setCurrentPrefetchSize(prefetch); 891 } 892 893 private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { 894 md.setDeliverySequenceId(session.getNextDeliveryId()); 895 lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); 896 if (!isAutoAcknowledgeBatch()) { 897 synchronized(deliveredMessages) { 898 deliveredMessages.addFirst(md); 899 } 900 if (session.getTransacted()) { 901 if (transactedIndividualAck) { 902 immediateIndividualTransactedAck(md); 903 } else { 904 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 905 } 906 } 907 } 908 } 909 910 private void immediateIndividualTransactedAck(MessageDispatch md) throws JMSException { 911 // acks accumulate on the broker pending transaction completion to indicate 912 // delivery status 913 registerSync(); 914 MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1); 915 ack.setTransactionId(session.getTransactionContext().getTransactionId()); 916 session.sendAck(ack); 917 } 918 919 private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { 920 if (unconsumedMessages.isClosed()) { 921 return; 922 } 923 if (messageExpired) { 924 acknowledge(md, MessageAck.EXPIRED_ACK_TYPE); 925 stats.getExpiredMessageCount().increment(); 926 } else { 927 stats.onMessage(); 928 if (session.getTransacted()) { 929 // Do nothing. 930 } else if (isAutoAcknowledgeEach()) { 931 if (deliveryingAcknowledgements.compareAndSet(false, true)) { 932 synchronized (deliveredMessages) { 933 if (!deliveredMessages.isEmpty()) { 934 if (optimizeAcknowledge) { 935 ackCounter++; 936 937 // AMQ-3956 evaluate both expired and normal msgs as 938 // otherwise consumer may get stalled 939 if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { 940 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 941 if (ack != null) { 942 deliveredMessages.clear(); 943 ackCounter = 0; 944 session.sendAck(ack); 945 optimizeAckTimestamp = System.currentTimeMillis(); 946 } 947 // AMQ-3956 - as further optimization send 948 // ack for expired msgs when there are any. 949 // This resets the deliveredCounter to 0 so that 950 // we won't sent standard acks with every msg just 951 // because the deliveredCounter just below 952 // 0.5 * prefetch as used in ackLater() 953 if (pendingAck != null && deliveredCounter > 0) { 954 session.sendAck(pendingAck); 955 pendingAck = null; 956 deliveredCounter = 0; 957 } 958 } 959 } else { 960 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 961 if (ack!=null) { 962 deliveredMessages.clear(); 963 session.sendAck(ack); 964 } 965 } 966 } 967 } 968 deliveryingAcknowledgements.set(false); 969 } 970 } else if (isAutoAcknowledgeBatch()) { 971 ackLater(md, MessageAck.STANDARD_ACK_TYPE); 972 } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { 973 boolean messageUnackedByConsumer = false; 974 synchronized (deliveredMessages) { 975 messageUnackedByConsumer = deliveredMessages.contains(md); 976 } 977 if (messageUnackedByConsumer) { 978 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 979 } 980 } 981 else { 982 throw new IllegalStateException("Invalid session state."); 983 } 984 } 985 } 986 987 /** 988 * Creates a MessageAck for all messages contained in deliveredMessages. 989 * Caller should hold the lock for deliveredMessages. 990 * 991 * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE) 992 * @return <code>null</code> if nothing to ack. 993 */ 994 private MessageAck makeAckForAllDeliveredMessages(byte type) { 995 synchronized (deliveredMessages) { 996 if (deliveredMessages.isEmpty()) { 997 return null; 998 } 999 1000 MessageDispatch md = deliveredMessages.getFirst(); 1001 MessageAck ack = new MessageAck(md, type, deliveredMessages.size()); 1002 ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId()); 1003 return ack; 1004 } 1005 } 1006 1007 private void ackLater(MessageDispatch md, byte ackType) throws JMSException { 1008 1009 // Don't acknowledge now, but we may need to let the broker know the 1010 // consumer got the message to expand the pre-fetch window 1011 if (session.getTransacted()) { 1012 registerSync(); 1013 } 1014 1015 deliveredCounter++; 1016 1017 MessageAck oldPendingAck = pendingAck; 1018 pendingAck = new MessageAck(md, ackType, deliveredCounter); 1019 pendingAck.setTransactionId(session.getTransactionContext().getTransactionId()); 1020 if( oldPendingAck==null ) { 1021 pendingAck.setFirstMessageId(pendingAck.getLastMessageId()); 1022 } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) { 1023 pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId()); 1024 } else { 1025 // old pending ack being superseded by ack of another type, if is is not a delivered 1026 // ack and hence important, send it now so it is not lost. 1027 if (!oldPendingAck.isDeliveredAck()) { 1028 LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, pendingAck); 1029 session.sendAck(oldPendingAck); 1030 } else { 1031 LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, pendingAck); 1032 } 1033 } 1034 // AMQ-3956 evaluate both expired and normal msgs as 1035 // otherwise consumer may get stalled 1036 if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) { 1037 LOG.debug("ackLater: sending: {}", pendingAck); 1038 session.sendAck(pendingAck); 1039 pendingAck=null; 1040 deliveredCounter = 0; 1041 additionalWindowSize = 0; 1042 } 1043 } 1044 1045 private void registerSync() throws JMSException { 1046 session.doStartTransaction(); 1047 if (!synchronizationRegistered) { 1048 synchronizationRegistered = true; 1049 session.getTransactionContext().addSynchronization(new Synchronization() { 1050 @Override 1051 public void beforeEnd() throws Exception { 1052 if (transactedIndividualAck) { 1053 clearDeliveredList(); 1054 waitForRedeliveries(); 1055 synchronized(deliveredMessages) { 1056 rollbackOnFailedRecoveryRedelivery(); 1057 } 1058 } else { 1059 acknowledge(); 1060 } 1061 synchronizationRegistered = false; 1062 } 1063 1064 @Override 1065 public void afterCommit() throws Exception { 1066 commit(); 1067 synchronizationRegistered = false; 1068 } 1069 1070 @Override 1071 public void afterRollback() throws Exception { 1072 rollback(); 1073 synchronizationRegistered = false; 1074 } 1075 }); 1076 } 1077 } 1078 1079 /** 1080 * Acknowledge all the messages that have been delivered to the client up to 1081 * this point. 1082 * 1083 * @throws JMSException 1084 */ 1085 public void acknowledge() throws JMSException { 1086 clearDeliveredList(); 1087 waitForRedeliveries(); 1088 synchronized(deliveredMessages) { 1089 // Acknowledge all messages so far. 1090 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 1091 if (ack == null) { 1092 return; // no msgs 1093 } 1094 1095 if (session.getTransacted()) { 1096 rollbackOnFailedRecoveryRedelivery(); 1097 session.doStartTransaction(); 1098 ack.setTransactionId(session.getTransactionContext().getTransactionId()); 1099 } 1100 1101 pendingAck = null; 1102 session.sendAck(ack); 1103 1104 // Adjust the counters 1105 deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size()); 1106 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); 1107 1108 if (!session.getTransacted()) { 1109 deliveredMessages.clear(); 1110 } 1111 } 1112 } 1113 1114 private void waitForRedeliveries() { 1115 if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) { 1116 long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod; 1117 int numberNotReplayed; 1118 do { 1119 numberNotReplayed = 0; 1120 synchronized(deliveredMessages) { 1121 if (previouslyDeliveredMessages != null) { 1122 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) { 1123 if (!entry.getValue()) { 1124 numberNotReplayed++; 1125 } 1126 } 1127 } 1128 } 1129 if (numberNotReplayed > 0) { 1130 LOG.info("waiting for redelivery of {} in transaction: {}, to consumer: {}", 1131 numberNotReplayed, this.getConsumerId(), previouslyDeliveredMessages.transactionId); 1132 try { 1133 Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4)); 1134 } catch (InterruptedException outOfhere) { 1135 break; 1136 } 1137 } 1138 } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis()); 1139 } 1140 } 1141 1142 /* 1143 * called with deliveredMessages locked 1144 */ 1145 private void rollbackOnFailedRecoveryRedelivery() throws JMSException { 1146 if (previouslyDeliveredMessages != null) { 1147 // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback 1148 // as messages have been dispatched else where. 1149 int numberNotReplayed = 0; 1150 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) { 1151 if (!entry.getValue()) { 1152 numberNotReplayed++; 1153 LOG.debug("previously delivered message has not been replayed in transaction: {}, messageId: {}", 1154 previouslyDeliveredMessages.transactionId, entry.getKey()); 1155 } 1156 } 1157 if (numberNotReplayed > 0) { 1158 String message = "rolling back transaction (" 1159 + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed 1160 + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId(); 1161 LOG.warn(message); 1162 throw new TransactionRolledBackException(message); 1163 } 1164 } 1165 } 1166 1167 void acknowledge(MessageDispatch md) throws JMSException { 1168 acknowledge(md, MessageAck.INDIVIDUAL_ACK_TYPE); 1169 } 1170 1171 void acknowledge(MessageDispatch md, byte ackType) throws JMSException { 1172 MessageAck ack = new MessageAck(md, ackType, 1); 1173 if (ack.isExpiredAck()) { 1174 ack.setFirstMessageId(ack.getLastMessageId()); 1175 } 1176 session.sendAck(ack); 1177 synchronized(deliveredMessages){ 1178 deliveredMessages.remove(md); 1179 } 1180 } 1181 1182 public void commit() throws JMSException { 1183 synchronized (deliveredMessages) { 1184 deliveredMessages.clear(); 1185 clearPreviouslyDelivered(); 1186 } 1187 redeliveryDelay = 0; 1188 } 1189 1190 public void rollback() throws JMSException { 1191 clearDeliveredList(); 1192 synchronized (unconsumedMessages.getMutex()) { 1193 if (optimizeAcknowledge) { 1194 // remove messages read but not acked at the broker yet through 1195 // optimizeAcknowledge 1196 if (!this.info.isBrowser()) { 1197 synchronized(deliveredMessages) { 1198 for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) { 1199 // ensure we don't filter this as a duplicate 1200 MessageDispatch md = deliveredMessages.removeLast(); 1201 session.connection.rollbackDuplicate(this, md.getMessage()); 1202 } 1203 } 1204 } 1205 } 1206 synchronized(deliveredMessages) { 1207 rollbackPreviouslyDeliveredAndNotRedelivered(); 1208 if (deliveredMessages.isEmpty()) { 1209 return; 1210 } 1211 1212 // use initial delay for first redelivery 1213 MessageDispatch lastMd = deliveredMessages.getFirst(); 1214 final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter(); 1215 if (currentRedeliveryCount > 0) { 1216 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); 1217 } else { 1218 redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); 1219 } 1220 MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId(); 1221 1222 for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) { 1223 MessageDispatch md = iter.next(); 1224 md.getMessage().onMessageRolledBack(); 1225 // ensure we don't filter this as a duplicate 1226 session.connection.rollbackDuplicate(this, md.getMessage()); 1227 } 1228 1229 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 1230 && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) { 1231 // We need to NACK the messages so that they get sent to the 1232 // DLQ. 1233 // Acknowledge the last message. 1234 1235 MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); 1236 ack.setFirstMessageId(firstMsgId); 1237 ack.setPoisonCause(new Throwable("Exceeded redelivery policy limit:" + redeliveryPolicy 1238 + ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause())); 1239 session.sendAck(ack,true); 1240 // Adjust the window size. 1241 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); 1242 redeliveryDelay = 0; 1243 1244 deliveredCounter -= deliveredMessages.size(); 1245 deliveredMessages.clear(); 1246 1247 } else { 1248 1249 // only redelivery_ack after first delivery 1250 if (currentRedeliveryCount > 0) { 1251 MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size()); 1252 ack.setFirstMessageId(firstMsgId); 1253 session.sendAck(ack,true); 1254 } 1255 1256 // stop the delivery of messages. 1257 if (nonBlockingRedelivery) { 1258 if (!unconsumedMessages.isClosed()) { 1259 1260 final LinkedList<MessageDispatch> pendingRedeliveries = 1261 new LinkedList<MessageDispatch>(deliveredMessages); 1262 1263 Collections.reverse(pendingRedeliveries); 1264 1265 deliveredCounter -= deliveredMessages.size(); 1266 deliveredMessages.clear(); 1267 1268 // Start up the delivery again a little later. 1269 session.getScheduler().executeAfterDelay(new Runnable() { 1270 @Override 1271 public void run() { 1272 try { 1273 if (!unconsumedMessages.isClosed()) { 1274 for(MessageDispatch dispatch : pendingRedeliveries) { 1275 session.dispatch(dispatch); 1276 } 1277 } 1278 } catch (Exception e) { 1279 session.connection.onAsyncException(e); 1280 } 1281 } 1282 }, redeliveryDelay); 1283 } 1284 1285 } else { 1286 unconsumedMessages.stop(); 1287 1288 for (MessageDispatch md : deliveredMessages) { 1289 unconsumedMessages.enqueueFirst(md); 1290 } 1291 1292 deliveredCounter -= deliveredMessages.size(); 1293 deliveredMessages.clear(); 1294 1295 if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) { 1296 // Start up the delivery again a little later. 1297 session.getScheduler().executeAfterDelay(new Runnable() { 1298 @Override 1299 public void run() { 1300 try { 1301 if (started.get()) { 1302 start(); 1303 } 1304 } catch (JMSException e) { 1305 session.connection.onAsyncException(e); 1306 } 1307 } 1308 }, redeliveryDelay); 1309 } else { 1310 start(); 1311 } 1312 } 1313 } 1314 } 1315 } 1316 if (messageListener.get() != null) { 1317 session.redispatch(this, unconsumedMessages); 1318 } 1319 } 1320 1321 /* 1322 * called with unconsumedMessages && deliveredMessages locked 1323 * remove any message not re-delivered as they can't be replayed to this 1324 * consumer on rollback 1325 */ 1326 private void rollbackPreviouslyDeliveredAndNotRedelivered() { 1327 if (previouslyDeliveredMessages != null) { 1328 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) { 1329 if (!entry.getValue()) { 1330 LOG.trace("rollback non redelivered: {}" + entry.getKey()); 1331 removeFromDeliveredMessages(entry.getKey()); 1332 } 1333 } 1334 clearPreviouslyDelivered(); 1335 } 1336 } 1337 1338 /* 1339 * called with deliveredMessages locked 1340 */ 1341 private void removeFromDeliveredMessages(MessageId key) { 1342 Iterator<MessageDispatch> iterator = deliveredMessages.iterator(); 1343 while (iterator.hasNext()) { 1344 MessageDispatch candidate = iterator.next(); 1345 if (key.equals(candidate.getMessage().getMessageId())) { 1346 session.connection.rollbackDuplicate(this, candidate.getMessage()); 1347 iterator.remove(); 1348 break; 1349 } 1350 } 1351 } 1352 1353 /* 1354 * called with deliveredMessages locked 1355 */ 1356 private void clearPreviouslyDelivered() { 1357 if (previouslyDeliveredMessages != null) { 1358 previouslyDeliveredMessages.clear(); 1359 previouslyDeliveredMessages = null; 1360 } 1361 } 1362 1363 @Override 1364 public void dispatch(MessageDispatch md) { 1365 MessageListener listener = this.messageListener.get(); 1366 try { 1367 clearMessagesInProgress(); 1368 clearDeliveredList(); 1369 synchronized (unconsumedMessages.getMutex()) { 1370 if (!unconsumedMessages.isClosed()) { 1371 if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { 1372 if (listener != null && unconsumedMessages.isRunning()) { 1373 if (redeliveryExceeded(md)) { 1374 posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); 1375 return; 1376 } 1377 ActiveMQMessage message = createActiveMQMessage(md); 1378 beforeMessageIsConsumed(md); 1379 try { 1380 boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired(); 1381 if (!expired) { 1382 listener.onMessage(message); 1383 } 1384 afterMessageIsConsumed(md, expired); 1385 } catch (RuntimeException e) { 1386 LOG.error("{} Exception while processing message: {}", getConsumerId(), md.getMessage().getMessageId(), e); 1387 if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) { 1388 // schedual redelivery and possible dlq processing 1389 md.setRollbackCause(e); 1390 rollback(); 1391 } else { 1392 // Transacted or Client ack: Deliver the 1393 // next message. 1394 afterMessageIsConsumed(md, false); 1395 } 1396 } 1397 } else { 1398 if (!unconsumedMessages.isRunning()) { 1399 // delayed redelivery, ensure it can be re delivered 1400 session.connection.rollbackDuplicate(this, md.getMessage()); 1401 } 1402 1403 if (md.getMessage() == null) { 1404 // End of browse or pull request timeout. 1405 unconsumedMessages.enqueue(md); 1406 } else { 1407 if (!consumeExpiredMessage(md)) { 1408 unconsumedMessages.enqueue(md); 1409 if (availableListener != null) { 1410 availableListener.onMessageAvailable(this); 1411 } 1412 } else { 1413 beforeMessageIsConsumed(md); 1414 afterMessageIsConsumed(md, true); 1415 1416 // Pull consumer needs to check if pull timed out and send 1417 // a new pull command if not. 1418 if (info.getCurrentPrefetchSize() == 0) { 1419 unconsumedMessages.enqueue(null); 1420 } 1421 } 1422 } 1423 } 1424 } else { 1425 // deal with duplicate delivery 1426 ConsumerId consumerWithPendingTransaction; 1427 if (redeliveryExpectedInCurrentTransaction(md, true)) { 1428 LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage()); 1429 if (transactedIndividualAck) { 1430 immediateIndividualTransactedAck(md); 1431 } else { 1432 session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1)); 1433 } 1434 } else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) { 1435 LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(), md.getMessage(), consumerWithPendingTransaction); 1436 session.getConnection().rollbackDuplicate(this, md.getMessage()); 1437 dispatch(md); 1438 } else { 1439 LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md); 1440 posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId()); 1441 } 1442 } 1443 } 1444 } 1445 if (++dispatchedCount % 1000 == 0) { 1446 dispatchedCount = 0; 1447 Thread.yield(); 1448 } 1449 } catch (Exception e) { 1450 session.connection.onClientInternalException(e); 1451 } 1452 } 1453 1454 private boolean redeliveryExpectedInCurrentTransaction(MessageDispatch md, boolean markReceipt) { 1455 if (session.isTransacted()) { 1456 synchronized (deliveredMessages) { 1457 if (previouslyDeliveredMessages != null) { 1458 if (previouslyDeliveredMessages.containsKey(md.getMessage().getMessageId())) { 1459 if (markReceipt) { 1460 previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true); 1461 } 1462 return true; 1463 } 1464 } 1465 } 1466 } 1467 return false; 1468 } 1469 1470 private ConsumerId redeliveryPendingInCompetingTransaction(MessageDispatch md) { 1471 for (ActiveMQSession activeMQSession: session.connection.getSessions()) { 1472 for (ActiveMQMessageConsumer activeMQMessageConsumer : activeMQSession.consumers) { 1473 if (activeMQMessageConsumer.redeliveryExpectedInCurrentTransaction(md, false)) { 1474 return activeMQMessageConsumer.getConsumerId(); 1475 } 1476 } 1477 } 1478 return null; 1479 } 1480 1481 // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again 1482 private void clearDeliveredList() { 1483 if (clearDeliveredList) { 1484 synchronized (deliveredMessages) { 1485 if (clearDeliveredList) { 1486 if (!deliveredMessages.isEmpty()) { 1487 if (session.isTransacted()) { 1488 1489 if (previouslyDeliveredMessages == null) { 1490 previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId()); 1491 } 1492 for (MessageDispatch delivered : deliveredMessages) { 1493 previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false); 1494 } 1495 LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt", 1496 getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size()); 1497 } else { 1498 if (session.isClientAcknowledge()) { 1499 LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size()); 1500 // allow redelivery 1501 if (!this.info.isBrowser()) { 1502 for (MessageDispatch md: deliveredMessages) { 1503 this.session.connection.rollbackDuplicate(this, md.getMessage()); 1504 } 1505 } 1506 } 1507 LOG.debug("{} clearing delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size()); 1508 deliveredMessages.clear(); 1509 pendingAck = null; 1510 } 1511 } 1512 clearDeliveredList = false; 1513 } 1514 } 1515 } 1516 } 1517 1518 public int getMessageSize() { 1519 return unconsumedMessages.size(); 1520 } 1521 1522 public void start() throws JMSException { 1523 if (unconsumedMessages.isClosed()) { 1524 return; 1525 } 1526 started.set(true); 1527 unconsumedMessages.start(); 1528 session.executor.wakeup(); 1529 } 1530 1531 public void stop() { 1532 started.set(false); 1533 unconsumedMessages.stop(); 1534 } 1535 1536 @Override 1537 public String toString() { 1538 return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get() 1539 + " }"; 1540 } 1541 1542 /** 1543 * Delivers a message to the message listener. 1544 * 1545 * @return 1546 * @throws JMSException 1547 */ 1548 public boolean iterate() { 1549 MessageListener listener = this.messageListener.get(); 1550 if (listener != null) { 1551 MessageDispatch md = unconsumedMessages.dequeueNoWait(); 1552 if (md != null) { 1553 dispatch(md); 1554 return true; 1555 } 1556 } 1557 return false; 1558 } 1559 1560 public boolean isInUse(ActiveMQTempDestination destination) { 1561 return info.getDestination().equals(destination); 1562 } 1563 1564 public long getLastDeliveredSequenceId() { 1565 return lastDeliveredSequenceId; 1566 } 1567 1568 public IOException getFailureError() { 1569 return failureError; 1570 } 1571 1572 public void setFailureError(IOException failureError) { 1573 this.failureError = failureError; 1574 } 1575 1576 /** 1577 * @return the optimizedAckScheduledAckInterval 1578 */ 1579 public long getOptimizedAckScheduledAckInterval() { 1580 return optimizedAckScheduledAckInterval; 1581 } 1582 1583 /** 1584 * @param optimizedAckScheduledAckInterval the optimizedAckScheduledAckInterval to set 1585 */ 1586 public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) throws JMSException { 1587 this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval; 1588 1589 if (this.optimizedAckTask != null) { 1590 try { 1591 this.session.connection.getScheduler().cancel(optimizedAckTask); 1592 } catch (JMSException e) { 1593 LOG.debug("Caught exception while cancelling old optimized ack task", e); 1594 throw e; 1595 } 1596 this.optimizedAckTask = null; 1597 } 1598 1599 // Should we periodically send out all outstanding acks. 1600 if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0) { 1601 this.optimizedAckTask = new Runnable() { 1602 1603 @Override 1604 public void run() { 1605 try { 1606 if (optimizeAcknowledge && !unconsumedMessages.isClosed()) { 1607 LOG.info("Consumer:{} is performing scheduled delivery of outstanding optimized Acks", info.getConsumerId()); 1608 deliverAcks(); 1609 } 1610 } catch (Exception e) { 1611 LOG.debug("Optimized Ack Task caught exception during ack", e); 1612 } 1613 } 1614 }; 1615 1616 try { 1617 this.session.connection.getScheduler().executePeriodically(optimizedAckTask, optimizedAckScheduledAckInterval); 1618 } catch (JMSException e) { 1619 LOG.debug("Caught exception while scheduling new optimized ack task", e); 1620 throw e; 1621 } 1622 } 1623 } 1624 1625 public boolean hasMessageListener() { 1626 return messageListener.get() != null; 1627 } 1628 1629 public boolean isConsumerExpiryCheckEnabled() { 1630 return consumerExpiryCheckEnabled; 1631 } 1632 1633 public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { 1634 this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; 1635 } 1636}