001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.HashMap; 023import java.util.Iterator; 024import java.util.LinkedList; 025import java.util.List; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.concurrent.atomic.AtomicReference; 033 034import javax.jms.IllegalStateException; 035import javax.jms.InvalidDestinationException; 036import javax.jms.JMSException; 037import javax.jms.Message; 038import javax.jms.MessageConsumer; 039import javax.jms.MessageListener; 040import javax.jms.TransactionRolledBackException; 041 042import org.apache.activemq.blob.BlobDownloader; 043import org.apache.activemq.command.*; 044import org.apache.activemq.management.JMSConsumerStatsImpl; 045import org.apache.activemq.management.StatsCapable; 046import org.apache.activemq.management.StatsImpl; 047import org.apache.activemq.selector.SelectorParser; 048import org.apache.activemq.transaction.Synchronization; 049import org.apache.activemq.util.Callback; 050import org.apache.activemq.util.IntrospectionSupport; 051import org.apache.activemq.util.JMSExceptionSupport; 052import org.apache.activemq.util.ThreadPoolUtils; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 058 * from a destination. A <CODE> MessageConsumer</CODE> object is created by 059 * passing a <CODE>Destination</CODE> object to a message-consumer creation 060 * method supplied by a session. 061 * <P> 062 * <CODE>MessageConsumer</CODE> is the parent interface for all message 063 * consumers. 064 * <P> 065 * A message consumer can be created with a message selector. A message selector 066 * allows the client to restrict the messages delivered to the message consumer 067 * to those that match the selector. 068 * <P> 069 * A client may either synchronously receive a message consumer's messages or 070 * have the consumer asynchronously deliver them as they arrive. 071 * <P> 072 * For synchronous receipt, a client can request the next message from a message 073 * consumer using one of its <CODE> receive</CODE> methods. There are several 074 * variations of <CODE>receive</CODE> that allow a client to poll or wait for 075 * the next message. 076 * <P> 077 * For asynchronous delivery, a client can register a 078 * <CODE>MessageListener</CODE> object with a message consumer. As messages 079 * arrive at the message consumer, it delivers them by calling the 080 * <CODE>MessageListener</CODE>'s<CODE> 081 * onMessage</CODE> method. 082 * <P> 083 * It is a client programming error for a <CODE>MessageListener</CODE> to 084 * throw an exception. 085 * 086 * 087 * @see javax.jms.MessageConsumer 088 * @see javax.jms.QueueReceiver 089 * @see javax.jms.TopicSubscriber 090 * @see javax.jms.Session 091 */ 092public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher { 093 094 @SuppressWarnings("serial") 095 class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> { 096 final TransactionId transactionId; 097 public PreviouslyDeliveredMap(TransactionId transactionId) { 098 this.transactionId = transactionId; 099 } 100 } 101 class PreviouslyDelivered { 102 org.apache.activemq.command.Message message; 103 boolean redelivered; 104 105 PreviouslyDelivered(MessageDispatch messageDispatch) { 106 message = messageDispatch.getMessage(); 107 } 108 109 PreviouslyDelivered(MessageDispatch messageDispatch, boolean redelivered) { 110 message = messageDispatch.getMessage(); 111 this.redelivered = redelivered; 112 } 113 } 114 115 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class); 116 protected final ActiveMQSession session; 117 protected final ConsumerInfo info; 118 119 // These are the messages waiting to be delivered to the client 120 protected final MessageDispatchChannel unconsumedMessages; 121 122 // The are the messages that were delivered to the consumer but that have 123 // not been acknowledged. It's kept in reverse order since we 124 // Always walk list in reverse order. 125 protected final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>(); 126 // track duplicate deliveries in a transaction such that the tx integrity can be validated 127 private PreviouslyDeliveredMap<MessageId, PreviouslyDelivered> previouslyDeliveredMessages; 128 private int deliveredCounter; 129 private int additionalWindowSize; 130 private long redeliveryDelay; 131 private int ackCounter; 132 private int dispatchedCount; 133 private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>(); 134 private final JMSConsumerStatsImpl stats; 135 136 private final String selector; 137 private boolean synchronizationRegistered; 138 private final AtomicBoolean started = new AtomicBoolean(false); 139 140 private MessageAvailableListener availableListener; 141 142 private RedeliveryPolicy redeliveryPolicy; 143 private boolean optimizeAcknowledge; 144 private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); 145 private ExecutorService executorService; 146 private MessageTransformer transformer; 147 private volatile boolean clearDeliveredList; 148 AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0); 149 150 private MessageAck pendingAck; 151 private long lastDeliveredSequenceId = -1; 152 153 private IOException failureError; 154 155 private long optimizeAckTimestamp = System.currentTimeMillis(); 156 private long optimizeAcknowledgeTimeOut = 0; 157 private long optimizedAckScheduledAckInterval = 0; 158 private Runnable optimizedAckTask; 159 private long failoverRedeliveryWaitPeriod = 0; 160 private boolean transactedIndividualAck = false; 161 private boolean nonBlockingRedelivery = false; 162 private boolean consumerExpiryCheckEnabled = true; 163 164 /** 165 * Create a MessageConsumer 166 * 167 * @param session 168 * @param dest 169 * @param name 170 * @param selector 171 * @param prefetch 172 * @param maximumPendingMessageCount 173 * @param noLocal 174 * @param browser 175 * @param dispatchAsync 176 * @param messageListener 177 * @throws JMSException 178 */ 179 public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, 180 String name, String selector, int prefetch, 181 int maximumPendingMessageCount, boolean noLocal, boolean browser, 182 boolean dispatchAsync, MessageListener messageListener) throws JMSException { 183 if (dest == null) { 184 throw new InvalidDestinationException("Don't understand null destinations"); 185 } else if (dest.getPhysicalName() == null) { 186 throw new InvalidDestinationException("The destination object was not given a physical name."); 187 } else if (dest.isTemporary()) { 188 String physicalName = dest.getPhysicalName(); 189 190 if (physicalName == null) { 191 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest); 192 } 193 194 String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue(); 195 196 if (physicalName.indexOf(connectionID) < 0) { 197 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection"); 198 } 199 200 if (session.connection.isDeleted(dest)) { 201 throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted"); 202 } 203 if (prefetch < 0) { 204 throw new JMSException("Cannot have a prefetch size less than zero"); 205 } 206 } 207 if (session.connection.isMessagePrioritySupported()) { 208 this.unconsumedMessages = new SimplePriorityMessageDispatchChannel(); 209 }else { 210 this.unconsumedMessages = new FifoMessageDispatchChannel(); 211 } 212 213 this.session = session; 214 this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest); 215 if (this.redeliveryPolicy == null) { 216 this.redeliveryPolicy = new RedeliveryPolicy(); 217 } 218 setTransformer(session.getTransformer()); 219 220 this.info = new ConsumerInfo(consumerId); 221 this.info.setExclusive(this.session.connection.isExclusiveConsumer()); 222 this.info.setClientId(this.session.connection.getClientID()); 223 this.info.setSubscriptionName(name); 224 this.info.setPrefetchSize(prefetch); 225 this.info.setCurrentPrefetchSize(prefetch); 226 this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount); 227 this.info.setNoLocal(noLocal); 228 this.info.setDispatchAsync(dispatchAsync); 229 this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer()); 230 this.info.setSelector(null); 231 232 // Allows the options on the destination to configure the consumerInfo 233 if (dest.getOptions() != null) { 234 Map<String, Object> options = IntrospectionSupport.extractProperties( 235 new HashMap<String, Object>(dest.getOptions()), "consumer."); 236 IntrospectionSupport.setProperties(this.info, options); 237 if (options.size() > 0) { 238 String msg = "There are " + options.size() 239 + " consumer options that couldn't be set on the consumer." 240 + " Check the options are spelled correctly." 241 + " Unknown parameters=[" + options + "]." 242 + " This consumer cannot be started."; 243 LOG.warn(msg); 244 throw new ConfigurationException(msg); 245 } 246 } 247 248 this.info.setDestination(dest); 249 this.info.setBrowser(browser); 250 if (selector != null && selector.trim().length() != 0) { 251 // Validate the selector 252 SelectorParser.parse(selector); 253 this.info.setSelector(selector); 254 this.selector = selector; 255 } else if (info.getSelector() != null) { 256 // Validate the selector 257 SelectorParser.parse(this.info.getSelector()); 258 this.selector = this.info.getSelector(); 259 } else { 260 this.selector = null; 261 } 262 263 this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest); 264 this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() 265 && !info.isBrowser(); 266 if (this.optimizeAcknowledge) { 267 this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut(); 268 setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval()); 269 } 270 271 this.info.setOptimizedAcknowledge(this.optimizeAcknowledge); 272 this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod(); 273 this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery(); 274 this.transactedIndividualAck = session.connection.isTransactedIndividualAck() 275 || this.nonBlockingRedelivery 276 || session.connection.isMessagePrioritySupported(); 277 this.consumerExpiryCheckEnabled = session.connection.isConsumerExpiryCheckEnabled(); 278 if (messageListener != null) { 279 setMessageListener(messageListener); 280 } 281 try { 282 this.session.addConsumer(this); 283 this.session.syncSendPacket(info); 284 } catch (JMSException e) { 285 this.session.removeConsumer(this); 286 throw e; 287 } 288 289 if (session.connection.isStarted()) { 290 start(); 291 } 292 } 293 294 private boolean isAutoAcknowledgeEach() { 295 return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() ); 296 } 297 298 private boolean isAutoAcknowledgeBatch() { 299 return session.isDupsOkAcknowledge() && !getDestination().isQueue() ; 300 } 301 302 @Override 303 public StatsImpl getStats() { 304 return stats; 305 } 306 307 public JMSConsumerStatsImpl getConsumerStats() { 308 return stats; 309 } 310 311 public RedeliveryPolicy getRedeliveryPolicy() { 312 return redeliveryPolicy; 313 } 314 315 /** 316 * Sets the redelivery policy used when messages are redelivered 317 */ 318 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 319 this.redeliveryPolicy = redeliveryPolicy; 320 } 321 322 public MessageTransformer getTransformer() { 323 return transformer; 324 } 325 326 /** 327 * Sets the transformer used to transform messages before they are sent on 328 * to the JMS bus 329 */ 330 public void setTransformer(MessageTransformer transformer) { 331 this.transformer = transformer; 332 } 333 334 /** 335 * @return Returns the value. 336 */ 337 public ConsumerId getConsumerId() { 338 return info.getConsumerId(); 339 } 340 341 /** 342 * @return the consumer name - used for durable consumers 343 */ 344 public String getConsumerName() { 345 return this.info.getSubscriptionName(); 346 } 347 348 /** 349 * @return true if this consumer does not accept locally produced messages 350 */ 351 protected boolean isNoLocal() { 352 return info.isNoLocal(); 353 } 354 355 /** 356 * Retrieve is a browser 357 * 358 * @return true if a browser 359 */ 360 protected boolean isBrowser() { 361 return info.isBrowser(); 362 } 363 364 /** 365 * @return ActiveMQDestination 366 */ 367 protected ActiveMQDestination getDestination() { 368 return info.getDestination(); 369 } 370 371 /** 372 * @return Returns the prefetchNumber. 373 */ 374 public int getPrefetchNumber() { 375 return info.getPrefetchSize(); 376 } 377 378 /** 379 * @return true if this is a durable topic subscriber 380 */ 381 public boolean isDurableSubscriber() { 382 return info.getSubscriptionName() != null && info.getDestination().isTopic(); 383 } 384 385 /** 386 * Gets this message consumer's message selector expression. 387 * 388 * @return this message consumer's message selector, or null if no message 389 * selector exists for the message consumer (that is, if the message 390 * selector was not set or was set to null or the empty string) 391 * @throws JMSException if the JMS provider fails to receive the next 392 * message due to some internal error. 393 */ 394 @Override 395 public String getMessageSelector() throws JMSException { 396 checkClosed(); 397 return selector; 398 } 399 400 /** 401 * Gets the message consumer's <CODE>MessageListener</CODE>. 402 * 403 * @return the listener for the message consumer, or null if no listener is 404 * set 405 * @throws JMSException if the JMS provider fails to get the message 406 * listener due to some internal error. 407 * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener) 408 */ 409 @Override 410 public MessageListener getMessageListener() throws JMSException { 411 checkClosed(); 412 return this.messageListener.get(); 413 } 414 415 /** 416 * Sets the message consumer's <CODE>MessageListener</CODE>. 417 * <P> 418 * Setting the message listener to null is the equivalent of unsetting the 419 * message listener for the message consumer. 420 * <P> 421 * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> 422 * while messages are being consumed by an existing listener or the consumer 423 * is being used to consume messages synchronously is undefined. 424 * 425 * @param listener the listener to which the messages are to be delivered 426 * @throws JMSException if the JMS provider fails to receive the next 427 * message due to some internal error. 428 * @see javax.jms.MessageConsumer#getMessageListener 429 */ 430 @Override 431 public void setMessageListener(MessageListener listener) throws JMSException { 432 checkClosed(); 433 if (info.getPrefetchSize() == 0) { 434 throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); 435 } 436 if (listener != null) { 437 boolean wasRunning = session.isRunning(); 438 if (wasRunning) { 439 session.stop(); 440 } 441 442 this.messageListener.set(listener); 443 session.redispatch(this, unconsumedMessages); 444 445 if (wasRunning) { 446 session.start(); 447 } 448 } else { 449 this.messageListener.set(null); 450 } 451 } 452 453 @Override 454 public MessageAvailableListener getAvailableListener() { 455 return availableListener; 456 } 457 458 /** 459 * Sets the listener used to notify synchronous consumers that there is a 460 * message available so that the {@link MessageConsumer#receiveNoWait()} can 461 * be called. 462 */ 463 @Override 464 public void setAvailableListener(MessageAvailableListener availableListener) { 465 this.availableListener = availableListener; 466 } 467 468 /** 469 * Used to get an enqueued message from the unconsumedMessages list. The 470 * amount of time this method blocks is based on the timeout value. - if 471 * timeout==-1 then it blocks until a message is received. - if timeout==0 472 * then it it tries to not block at all, it returns a message if it is 473 * available - if timeout>0 then it blocks up to timeout amount of time. 474 * Expired messages will consumed by this method. 475 * 476 * @throws JMSException 477 * @return null if we timeout or if the consumer is closed. 478 */ 479 private MessageDispatch dequeue(long timeout) throws JMSException { 480 try { 481 long deadline = 0; 482 if (timeout > 0) { 483 deadline = System.currentTimeMillis() + timeout; 484 } 485 while (true) { 486 MessageDispatch md = unconsumedMessages.dequeue(timeout); 487 if (md == null) { 488 if (timeout > 0 && !unconsumedMessages.isClosed()) { 489 timeout = Math.max(deadline - System.currentTimeMillis(), 0); 490 } else { 491 if (failureError != null) { 492 throw JMSExceptionSupport.create(failureError); 493 } else { 494 return null; 495 } 496 } 497 } else if (md.getMessage() == null) { 498 return null; 499 } else if (consumeExpiredMessage(md)) { 500 LOG.debug("{} received expired message: {}", getConsumerId(), md); 501 beforeMessageIsConsumed(md); 502 afterMessageIsConsumed(md, true); 503 if (timeout > 0) { 504 timeout = Math.max(deadline - System.currentTimeMillis(), 0); 505 } 506 sendPullCommand(timeout); 507 } else if (redeliveryExceeded(md)) { 508 LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md); 509 posionAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); 510 if (timeout > 0) { 511 timeout = Math.max(deadline - System.currentTimeMillis(), 0); 512 } 513 sendPullCommand(timeout); 514 } else { 515 if (LOG.isTraceEnabled()) { 516 LOG.trace(getConsumerId() + " received message: " + md); 517 } 518 return md; 519 } 520 } 521 } catch (InterruptedException e) { 522 Thread.currentThread().interrupt(); 523 throw JMSExceptionSupport.create(e); 524 } 525 } 526 527 private boolean consumeExpiredMessage(MessageDispatch dispatch) { 528 return isConsumerExpiryCheckEnabled() && dispatch.getMessage().isExpired(); 529 } 530 531 private void posionAck(MessageDispatch md, String cause) throws JMSException { 532 MessageAck posionAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 533 posionAck.setFirstMessageId(md.getMessage().getMessageId()); 534 posionAck.setPoisonCause(new Throwable(cause)); 535 session.sendAck(posionAck); 536 } 537 538 private boolean redeliveryExceeded(MessageDispatch md) { 539 try { 540 return session.getTransacted() 541 && redeliveryPolicy != null 542 && redeliveryPolicy.isPreDispatchCheck() 543 && redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 544 && md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries() 545 // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin 546 && md.getMessage().getProperty("redeliveryDelay") == null; 547 } catch (Exception ignored) { 548 return false; 549 } 550 } 551 552 /** 553 * Receives the next message produced for this message consumer. 554 * <P> 555 * This call blocks indefinitely until a message is produced or until this 556 * message consumer is closed. 557 * <P> 558 * If this <CODE>receive</CODE> is done within a transaction, the consumer 559 * retains the message until the transaction commits. 560 * 561 * @return the next message produced for this message consumer, or null if 562 * this message consumer is concurrently closed 563 */ 564 @Override 565 public Message receive() throws JMSException { 566 checkClosed(); 567 checkMessageListener(); 568 569 sendPullCommand(0); 570 MessageDispatch md = dequeue(-1); 571 if (md == null) { 572 return null; 573 } 574 575 beforeMessageIsConsumed(md); 576 afterMessageIsConsumed(md, false); 577 578 return createActiveMQMessage(md); 579 } 580 581 /** 582 * @param md 583 * @return 584 */ 585 private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException { 586 ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy(); 587 if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) { 588 ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy())); 589 } 590 if (m.getDataStructureType() == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) { 591 ((ActiveMQObjectMessage)m).setTrustAllPackages(session.getConnection().isTrustAllPackages()); 592 ((ActiveMQObjectMessage)m).setTrustedPackages(session.getConnection().getTrustedPackages()); 593 } 594 if (transformer != null) { 595 Message transformedMessage = transformer.consumerTransform(session, this, m); 596 if (transformedMessage != null) { 597 m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection); 598 } 599 } 600 if (session.isClientAcknowledge()) { 601 m.setAcknowledgeCallback(new Callback() { 602 @Override 603 public void execute() throws Exception { 604 session.checkClosed(); 605 session.acknowledge(); 606 } 607 }); 608 } else if (session.isIndividualAcknowledge()) { 609 m.setAcknowledgeCallback(new Callback() { 610 @Override 611 public void execute() throws Exception { 612 session.checkClosed(); 613 acknowledge(md); 614 } 615 }); 616 } 617 return m; 618 } 619 620 /** 621 * Receives the next message that arrives within the specified timeout 622 * interval. 623 * <P> 624 * This call blocks until a message arrives, the timeout expires, or this 625 * message consumer is closed. A <CODE>timeout</CODE> of zero never 626 * expires, and the call blocks indefinitely. 627 * 628 * @param timeout the timeout value (in milliseconds), a time out of zero 629 * never expires. 630 * @return the next message produced for this message consumer, or null if 631 * the timeout expires or this message consumer is concurrently 632 * closed 633 */ 634 @Override 635 public Message receive(long timeout) throws JMSException { 636 checkClosed(); 637 checkMessageListener(); 638 if (timeout == 0) { 639 return this.receive(); 640 } 641 642 sendPullCommand(timeout); 643 while (timeout > 0) { 644 645 MessageDispatch md; 646 if (info.getPrefetchSize() == 0) { 647 md = dequeue(-1); // We let the broker let us know when we timeout. 648 } else { 649 md = dequeue(timeout); 650 } 651 652 if (md == null) { 653 return null; 654 } 655 656 beforeMessageIsConsumed(md); 657 afterMessageIsConsumed(md, false); 658 return createActiveMQMessage(md); 659 } 660 return null; 661 } 662 663 /** 664 * Receives the next message if one is immediately available. 665 * 666 * @return the next message produced for this message consumer, or null if 667 * one is not available 668 * @throws JMSException if the JMS provider fails to receive the next 669 * message due to some internal error. 670 */ 671 @Override 672 public Message receiveNoWait() throws JMSException { 673 checkClosed(); 674 checkMessageListener(); 675 sendPullCommand(-1); 676 677 MessageDispatch md; 678 if (info.getPrefetchSize() == 0) { 679 md = dequeue(-1); // We let the broker let us know when we 680 // timeout. 681 } else { 682 md = dequeue(0); 683 } 684 685 if (md == null) { 686 return null; 687 } 688 689 beforeMessageIsConsumed(md); 690 afterMessageIsConsumed(md, false); 691 return createActiveMQMessage(md); 692 } 693 694 /** 695 * Closes the message consumer. 696 * <P> 697 * Since a provider may allocate some resources on behalf of a <CODE> 698 * MessageConsumer</CODE> 699 * outside the Java virtual machine, clients should close them when they are 700 * not needed. Relying on garbage collection to eventually reclaim these 701 * resources may not be timely enough. 702 * <P> 703 * This call blocks until a <CODE>receive</CODE> or message listener in 704 * progress has completed. A blocked message consumer <CODE>receive </CODE> 705 * call returns null when this message consumer is closed. 706 * 707 * @throws JMSException if the JMS provider fails to close the consumer due 708 * to some internal error. 709 */ 710 @Override 711 public void close() throws JMSException { 712 if (!unconsumedMessages.isClosed()) { 713 if (!deliveredMessages.isEmpty() && session.getTransactionContext().isInTransaction()) { 714 session.getTransactionContext().addSynchronization(new Synchronization() { 715 @Override 716 public void afterCommit() throws Exception { 717 doClose(); 718 } 719 720 @Override 721 public void afterRollback() throws Exception { 722 doClose(); 723 } 724 }); 725 } else { 726 doClose(); 727 } 728 } 729 } 730 731 void doClose() throws JMSException { 732 dispose(); 733 RemoveInfo removeCommand = info.createRemoveCommand(); 734 LOG.debug("remove: {}, lastDeliveredSequenceId: {}", getConsumerId(), lastDeliveredSequenceId); 735 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 736 this.session.asyncSendPacket(removeCommand); 737 } 738 739 void inProgressClearRequired() { 740 inProgressClearRequiredFlag.incrementAndGet(); 741 // deal with delivered messages async to avoid lock contention with in progress acks 742 clearDeliveredList = true; 743 // force a rollback if we will be acking in a transaction after/during failover 744 // bc acks are async they may not get there reliably on reconnect and the consumer 745 // may not be aware of the reconnect in a timely fashion if in onMessage 746 if (!deliveredMessages.isEmpty() && session.getTransactionContext().isInTransaction()) { 747 session.getTransactionContext().setRollbackOnly(true); 748 } 749 } 750 751 void clearMessagesInProgress() { 752 if (inProgressClearRequiredFlag.get() > 0) { 753 synchronized (unconsumedMessages.getMutex()) { 754 if (inProgressClearRequiredFlag.get() > 0) { 755 LOG.debug("{} clearing unconsumed list ({}) on transport interrupt", getConsumerId(), unconsumedMessages.size()); 756 // ensure unconsumed are rolledback up front as they may get redelivered to another consumer 757 List<MessageDispatch> list = unconsumedMessages.removeAll(); 758 if (!this.info.isBrowser()) { 759 for (MessageDispatch old : list) { 760 session.connection.rollbackDuplicate(this, old.getMessage()); 761 } 762 } 763 // allow dispatch on this connection to resume 764 session.connection.transportInterruptionProcessingComplete(); 765 inProgressClearRequiredFlag.set(0); 766 767 // Wake up any blockers and allow them to recheck state. 768 unconsumedMessages.getMutex().notifyAll(); 769 } 770 } 771 clearDeliveredList(); 772 } 773 } 774 775 void deliverAcks() { 776 MessageAck ack = null; 777 if (deliveryingAcknowledgements.compareAndSet(false, true)) { 778 if (isAutoAcknowledgeEach()) { 779 synchronized(deliveredMessages) { 780 ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 781 if (ack != null) { 782 deliveredMessages.clear(); 783 ackCounter = 0; 784 } else { 785 ack = pendingAck; 786 pendingAck = null; 787 } 788 } 789 } else if (pendingAck != null && pendingAck.isStandardAck()) { 790 ack = pendingAck; 791 pendingAck = null; 792 } 793 if (ack != null) { 794 final MessageAck ackToSend = ack; 795 796 if (executorService == null) { 797 executorService = Executors.newSingleThreadExecutor(); 798 } 799 executorService.submit(new Runnable() { 800 @Override 801 public void run() { 802 try { 803 session.sendAck(ackToSend,true); 804 } catch (JMSException e) { 805 LOG.error(getConsumerId() + " failed to delivered acknowledgements", e); 806 } finally { 807 deliveryingAcknowledgements.set(false); 808 } 809 } 810 }); 811 } else { 812 deliveryingAcknowledgements.set(false); 813 } 814 } 815 } 816 817 public void dispose() throws JMSException { 818 if (!unconsumedMessages.isClosed()) { 819 820 // Do we have any acks we need to send out before closing? 821 // Ack any delivered messages now. 822 if (!session.getTransacted()) { 823 deliverAcks(); 824 if (isAutoAcknowledgeBatch()) { 825 acknowledge(); 826 } 827 } 828 if (executorService != null) { 829 ThreadPoolUtils.shutdownGraceful(executorService, 60000L); 830 executorService = null; 831 } 832 if (optimizedAckTask != null) { 833 this.session.connection.getScheduler().cancel(optimizedAckTask); 834 optimizedAckTask = null; 835 } 836 837 if (session.isClientAcknowledge()) { 838 if (!this.info.isBrowser()) { 839 // rollback duplicates that aren't acknowledged 840 List<MessageDispatch> tmp = null; 841 synchronized (this.deliveredMessages) { 842 tmp = new ArrayList<MessageDispatch>(this.deliveredMessages); 843 } 844 for (MessageDispatch old : tmp) { 845 this.session.connection.rollbackDuplicate(this, old.getMessage()); 846 } 847 tmp.clear(); 848 } 849 } 850 if (!session.isTransacted()) { 851 synchronized(deliveredMessages) { 852 deliveredMessages.clear(); 853 } 854 } 855 unconsumedMessages.close(); 856 this.session.removeConsumer(this); 857 List<MessageDispatch> list = unconsumedMessages.removeAll(); 858 if (!this.info.isBrowser()) { 859 for (MessageDispatch old : list) { 860 // ensure we don't filter this as a duplicate 861 if (old.getMessage() != null) { 862 LOG.debug("on close, rollback duplicate: {}", old.getMessage().getMessageId()); 863 } 864 session.connection.rollbackDuplicate(this, old.getMessage()); 865 } 866 } 867 } 868 if (previouslyDeliveredMessages != null) { 869 for (PreviouslyDelivered previouslyDelivered : previouslyDeliveredMessages.values()) { 870 session.connection.rollbackDuplicate(this, previouslyDelivered.message); 871 } 872 } 873 clearPreviouslyDelivered(); 874 } 875 876 /** 877 * @throws IllegalStateException 878 */ 879 protected void checkClosed() throws IllegalStateException { 880 if (unconsumedMessages.isClosed()) { 881 throw new IllegalStateException("The Consumer is closed"); 882 } 883 } 884 885 /** 886 * If we have a zero prefetch specified then send a pull command to the 887 * broker to pull a message we are about to receive 888 */ 889 protected void sendPullCommand(long timeout) throws JMSException { 890 clearDeliveredList(); 891 if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { 892 MessagePull messagePull = new MessagePull(); 893 messagePull.configure(info); 894 messagePull.setTimeout(timeout); 895 session.asyncSendPacket(messagePull); 896 } 897 } 898 899 protected void checkMessageListener() throws JMSException { 900 session.checkMessageListener(); 901 } 902 903 protected void setOptimizeAcknowledge(boolean value) { 904 if (optimizeAcknowledge && !value) { 905 deliverAcks(); 906 } 907 optimizeAcknowledge = value; 908 } 909 910 protected void setPrefetchSize(int prefetch) { 911 deliverAcks(); 912 this.info.setCurrentPrefetchSize(prefetch); 913 } 914 915 private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { 916 md.setDeliverySequenceId(session.getNextDeliveryId()); 917 lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); 918 if (!isAutoAcknowledgeBatch()) { 919 synchronized(deliveredMessages) { 920 deliveredMessages.addFirst(md); 921 } 922 if (session.getTransacted()) { 923 if (transactedIndividualAck) { 924 immediateIndividualTransactedAck(md); 925 } else { 926 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 927 } 928 } 929 } 930 } 931 932 private void immediateIndividualTransactedAck(MessageDispatch md) throws JMSException { 933 // acks accumulate on the broker pending transaction completion to indicate 934 // delivery status 935 registerSync(); 936 MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1); 937 ack.setTransactionId(session.getTransactionContext().getTransactionId()); 938 session.sendAck(ack); 939 } 940 941 private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { 942 if (unconsumedMessages.isClosed()) { 943 return; 944 } 945 if (messageExpired) { 946 acknowledge(md, MessageAck.EXPIRED_ACK_TYPE); 947 stats.getExpiredMessageCount().increment(); 948 } else { 949 stats.onMessage(); 950 if (session.getTransacted()) { 951 // Do nothing. 952 } else if (isAutoAcknowledgeEach()) { 953 if (deliveryingAcknowledgements.compareAndSet(false, true)) { 954 synchronized (deliveredMessages) { 955 if (!deliveredMessages.isEmpty()) { 956 if (optimizeAcknowledge) { 957 ackCounter++; 958 959 // AMQ-3956 evaluate both expired and normal msgs as 960 // otherwise consumer may get stalled 961 if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { 962 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 963 if (ack != null) { 964 deliveredMessages.clear(); 965 ackCounter = 0; 966 session.sendAck(ack); 967 optimizeAckTimestamp = System.currentTimeMillis(); 968 } 969 // AMQ-3956 - as further optimization send 970 // ack for expired msgs when there are any. 971 // This resets the deliveredCounter to 0 so that 972 // we won't sent standard acks with every msg just 973 // because the deliveredCounter just below 974 // 0.5 * prefetch as used in ackLater() 975 if (pendingAck != null && deliveredCounter > 0) { 976 session.sendAck(pendingAck); 977 pendingAck = null; 978 deliveredCounter = 0; 979 } 980 } 981 } else { 982 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 983 if (ack!=null) { 984 deliveredMessages.clear(); 985 session.sendAck(ack); 986 } 987 } 988 } 989 } 990 deliveryingAcknowledgements.set(false); 991 } 992 } else if (isAutoAcknowledgeBatch()) { 993 ackLater(md, MessageAck.STANDARD_ACK_TYPE); 994 } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { 995 boolean messageUnackedByConsumer = false; 996 synchronized (deliveredMessages) { 997 messageUnackedByConsumer = deliveredMessages.contains(md); 998 } 999 if (messageUnackedByConsumer) { 1000 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 1001 } 1002 } 1003 else { 1004 throw new IllegalStateException("Invalid session state."); 1005 } 1006 } 1007 } 1008 1009 /** 1010 * Creates a MessageAck for all messages contained in deliveredMessages. 1011 * Caller should hold the lock for deliveredMessages. 1012 * 1013 * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE) 1014 * @return <code>null</code> if nothing to ack. 1015 */ 1016 private MessageAck makeAckForAllDeliveredMessages(byte type) { 1017 synchronized (deliveredMessages) { 1018 if (deliveredMessages.isEmpty()) { 1019 return null; 1020 } 1021 1022 MessageDispatch md = deliveredMessages.getFirst(); 1023 MessageAck ack = new MessageAck(md, type, deliveredMessages.size()); 1024 ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId()); 1025 return ack; 1026 } 1027 } 1028 1029 private void ackLater(MessageDispatch md, byte ackType) throws JMSException { 1030 1031 // Don't acknowledge now, but we may need to let the broker know the 1032 // consumer got the message to expand the pre-fetch window 1033 if (session.getTransacted()) { 1034 registerSync(); 1035 } 1036 1037 deliveredCounter++; 1038 1039 MessageAck oldPendingAck = pendingAck; 1040 pendingAck = new MessageAck(md, ackType, deliveredCounter); 1041 pendingAck.setTransactionId(session.getTransactionContext().getTransactionId()); 1042 if( oldPendingAck==null ) { 1043 pendingAck.setFirstMessageId(pendingAck.getLastMessageId()); 1044 } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) { 1045 pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId()); 1046 } else { 1047 // old pending ack being superseded by ack of another type, if is is not a delivered 1048 // ack and hence important, send it now so it is not lost. 1049 if (!oldPendingAck.isDeliveredAck()) { 1050 LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, pendingAck); 1051 session.sendAck(oldPendingAck); 1052 } else { 1053 LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, pendingAck); 1054 } 1055 } 1056 // AMQ-3956 evaluate both expired and normal msgs as 1057 // otherwise consumer may get stalled 1058 if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) { 1059 LOG.debug("ackLater: sending: {}", pendingAck); 1060 session.sendAck(pendingAck); 1061 pendingAck=null; 1062 deliveredCounter = 0; 1063 additionalWindowSize = 0; 1064 } 1065 } 1066 1067 private void registerSync() throws JMSException { 1068 session.doStartTransaction(); 1069 if (!synchronizationRegistered) { 1070 synchronizationRegistered = true; 1071 session.getTransactionContext().addSynchronization(new Synchronization() { 1072 @Override 1073 public void beforeEnd() throws Exception { 1074 if (transactedIndividualAck) { 1075 clearDeliveredList(); 1076 waitForRedeliveries(); 1077 synchronized(deliveredMessages) { 1078 rollbackOnFailedRecoveryRedelivery(); 1079 } 1080 } else { 1081 acknowledge(); 1082 } 1083 synchronizationRegistered = false; 1084 } 1085 1086 @Override 1087 public void afterCommit() throws Exception { 1088 commit(); 1089 synchronizationRegistered = false; 1090 } 1091 1092 @Override 1093 public void afterRollback() throws Exception { 1094 rollback(); 1095 synchronizationRegistered = false; 1096 } 1097 }); 1098 } 1099 } 1100 1101 /** 1102 * Acknowledge all the messages that have been delivered to the client up to 1103 * this point. 1104 * 1105 * @throws JMSException 1106 */ 1107 public void acknowledge() throws JMSException { 1108 clearDeliveredList(); 1109 waitForRedeliveries(); 1110 synchronized(deliveredMessages) { 1111 // Acknowledge all messages so far. 1112 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 1113 if (ack == null) { 1114 return; // no msgs 1115 } 1116 1117 if (session.getTransacted()) { 1118 rollbackOnFailedRecoveryRedelivery(); 1119 session.doStartTransaction(); 1120 ack.setTransactionId(session.getTransactionContext().getTransactionId()); 1121 } 1122 1123 pendingAck = null; 1124 session.sendAck(ack); 1125 1126 // Adjust the counters 1127 deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size()); 1128 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); 1129 1130 if (!session.getTransacted()) { 1131 deliveredMessages.clear(); 1132 } 1133 } 1134 } 1135 1136 private void waitForRedeliveries() { 1137 if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) { 1138 long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod; 1139 int numberNotReplayed; 1140 do { 1141 numberNotReplayed = 0; 1142 synchronized(deliveredMessages) { 1143 if (previouslyDeliveredMessages != null) { 1144 for (PreviouslyDelivered entry: previouslyDeliveredMessages.values()) { 1145 if (!entry.redelivered) { 1146 numberNotReplayed++; 1147 } 1148 } 1149 } 1150 } 1151 if (numberNotReplayed > 0) { 1152 LOG.info("waiting for redelivery of {} in transaction: {}, to consumer: {}", 1153 numberNotReplayed, this.getConsumerId(), previouslyDeliveredMessages.transactionId); 1154 try { 1155 Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4)); 1156 } catch (InterruptedException outOfhere) { 1157 break; 1158 } 1159 } 1160 } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis()); 1161 } 1162 } 1163 1164 /* 1165 * called with deliveredMessages locked 1166 */ 1167 private void rollbackOnFailedRecoveryRedelivery() throws JMSException { 1168 if (previouslyDeliveredMessages != null) { 1169 // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback 1170 // as messages have been dispatched else where. 1171 int numberNotReplayed = 0; 1172 for (PreviouslyDelivered entry: previouslyDeliveredMessages.values()) { 1173 if (!entry.redelivered) { 1174 numberNotReplayed++; 1175 LOG.debug("previously delivered message has not been replayed in transaction: {}, messageId: {}", 1176 previouslyDeliveredMessages.transactionId, entry.message.getMessageId()); 1177 } 1178 } 1179 if (numberNotReplayed > 0) { 1180 String message = "rolling back transaction (" 1181 + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed 1182 + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId(); 1183 LOG.warn(message); 1184 throw new TransactionRolledBackException(message); 1185 } 1186 } 1187 } 1188 1189 void acknowledge(MessageDispatch md) throws JMSException { 1190 acknowledge(md, MessageAck.INDIVIDUAL_ACK_TYPE); 1191 } 1192 1193 void acknowledge(MessageDispatch md, byte ackType) throws JMSException { 1194 MessageAck ack = new MessageAck(md, ackType, 1); 1195 if (ack.isExpiredAck()) { 1196 ack.setFirstMessageId(ack.getLastMessageId()); 1197 } 1198 session.sendAck(ack); 1199 synchronized(deliveredMessages){ 1200 deliveredMessages.remove(md); 1201 } 1202 } 1203 1204 public void commit() throws JMSException { 1205 synchronized (deliveredMessages) { 1206 deliveredMessages.clear(); 1207 clearPreviouslyDelivered(); 1208 } 1209 redeliveryDelay = 0; 1210 } 1211 1212 public void rollback() throws JMSException { 1213 clearDeliveredList(); 1214 synchronized (unconsumedMessages.getMutex()) { 1215 if (optimizeAcknowledge) { 1216 // remove messages read but not acked at the broker yet through 1217 // optimizeAcknowledge 1218 if (!this.info.isBrowser()) { 1219 synchronized(deliveredMessages) { 1220 for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) { 1221 // ensure we don't filter this as a duplicate 1222 MessageDispatch md = deliveredMessages.removeLast(); 1223 session.connection.rollbackDuplicate(this, md.getMessage()); 1224 } 1225 } 1226 } 1227 } 1228 synchronized(deliveredMessages) { 1229 rollbackPreviouslyDeliveredAndNotRedelivered(); 1230 if (deliveredMessages.isEmpty()) { 1231 return; 1232 } 1233 1234 // use initial delay for first redelivery 1235 MessageDispatch lastMd = deliveredMessages.getFirst(); 1236 final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter(); 1237 if (currentRedeliveryCount > 0) { 1238 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); 1239 } else { 1240 redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); 1241 } 1242 MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId(); 1243 1244 for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) { 1245 MessageDispatch md = iter.next(); 1246 md.getMessage().onMessageRolledBack(); 1247 // ensure we don't filter this as a duplicate 1248 session.connection.rollbackDuplicate(this, md.getMessage()); 1249 } 1250 1251 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 1252 && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) { 1253 // We need to NACK the messages so that they get sent to the 1254 // DLQ. 1255 // Acknowledge the last message. 1256 1257 MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); 1258 ack.setFirstMessageId(firstMsgId); 1259 ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter() + "] exceeds redelivery policy limit:" + redeliveryPolicy 1260 + ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause())); 1261 session.sendAck(ack,true); 1262 // Adjust the window size. 1263 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); 1264 redeliveryDelay = 0; 1265 1266 deliveredCounter -= deliveredMessages.size(); 1267 deliveredMessages.clear(); 1268 1269 } else { 1270 1271 // only redelivery_ack after first delivery 1272 if (currentRedeliveryCount > 0) { 1273 MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size()); 1274 ack.setFirstMessageId(firstMsgId); 1275 session.sendAck(ack,true); 1276 } 1277 1278 final LinkedList<MessageDispatch> pendingSessionRedelivery = 1279 new LinkedList<MessageDispatch>(deliveredMessages); 1280 1281 captureDeliveredMessagesForDuplicateSuppressionWithRequireRedelivery(false); 1282 1283 deliveredCounter -= deliveredMessages.size(); 1284 deliveredMessages.clear(); 1285 1286 if (!unconsumedMessages.isClosed()) { 1287 1288 if (nonBlockingRedelivery) { 1289 Collections.reverse(pendingSessionRedelivery); 1290 1291 // Start up the delivery again a little later. 1292 session.getScheduler().executeAfterDelay(new Runnable() { 1293 @Override 1294 public void run() { 1295 try { 1296 if (!unconsumedMessages.isClosed()) { 1297 for(MessageDispatch dispatch : pendingSessionRedelivery) { 1298 session.dispatch(dispatch); 1299 } 1300 } 1301 } catch (Exception e) { 1302 session.connection.onAsyncException(e); 1303 } 1304 } 1305 }, redeliveryDelay); 1306 1307 } else { 1308 // stop the delivery of messages. 1309 unconsumedMessages.stop(); 1310 1311 final ActiveMQMessageConsumer dispatcher = this; 1312 1313 Runnable redispatchWork = new Runnable() { 1314 @Override 1315 public void run() { 1316 try { 1317 if (!unconsumedMessages.isClosed()) { 1318 synchronized (unconsumedMessages.getMutex()) { 1319 for (MessageDispatch md : pendingSessionRedelivery) { 1320 unconsumedMessages.enqueueFirst(md); 1321 } 1322 1323 if (messageListener.get() != null) { 1324 session.redispatch(dispatcher, unconsumedMessages); 1325 } 1326 } 1327 if (started.get()) { 1328 start(); 1329 } 1330 } 1331 } catch (JMSException e) { 1332 session.connection.onAsyncException(e); 1333 } 1334 } 1335 }; 1336 1337 if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) { 1338 // Start up the delivery again a little later. 1339 session.getScheduler().executeAfterDelay(redispatchWork, redeliveryDelay); 1340 } else { 1341 redispatchWork.run(); 1342 } 1343 } 1344 } else { 1345 for (MessageDispatch md : pendingSessionRedelivery) { 1346 session.connection.rollbackDuplicate(this, md.getMessage()); 1347 } 1348 } 1349 } 1350 } 1351 } 1352 } 1353 1354 /* 1355 * called with unconsumedMessages && deliveredMessages locked 1356 * remove any message not re-delivered as they can't be replayed to this 1357 * consumer on rollback 1358 */ 1359 private void rollbackPreviouslyDeliveredAndNotRedelivered() { 1360 if (previouslyDeliveredMessages != null) { 1361 for (PreviouslyDelivered entry: previouslyDeliveredMessages.values()) { 1362 if (!entry.redelivered) { 1363 LOG.trace("rollback non redelivered: {}", entry.message.getMessageId()); 1364 removeFromDeliveredMessages(entry.message.getMessageId()); 1365 } 1366 } 1367 clearPreviouslyDelivered(); 1368 } 1369 } 1370 1371 /* 1372 * called with deliveredMessages locked 1373 */ 1374 private void removeFromDeliveredMessages(MessageId key) { 1375 Iterator<MessageDispatch> iterator = deliveredMessages.iterator(); 1376 while (iterator.hasNext()) { 1377 MessageDispatch candidate = iterator.next(); 1378 if (key.equals(candidate.getMessage().getMessageId())) { 1379 session.connection.rollbackDuplicate(this, candidate.getMessage()); 1380 iterator.remove(); 1381 break; 1382 } 1383 } 1384 } 1385 1386 /* 1387 * called with deliveredMessages locked 1388 */ 1389 private void clearPreviouslyDelivered() { 1390 if (previouslyDeliveredMessages != null) { 1391 previouslyDeliveredMessages.clear(); 1392 previouslyDeliveredMessages = null; 1393 } 1394 } 1395 1396 @Override 1397 public void dispatch(MessageDispatch md) { 1398 MessageListener listener = this.messageListener.get(); 1399 try { 1400 clearMessagesInProgress(); 1401 clearDeliveredList(); 1402 synchronized (unconsumedMessages.getMutex()) { 1403 if (!unconsumedMessages.isClosed()) { 1404 // deliverySequenceId non zero means previously queued dispatch 1405 if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this, md.getMessage())) { 1406 if (listener != null && unconsumedMessages.isRunning()) { 1407 if (redeliveryExceeded(md)) { 1408 posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); 1409 return; 1410 } 1411 ActiveMQMessage message = createActiveMQMessage(md); 1412 beforeMessageIsConsumed(md); 1413 try { 1414 boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired(); 1415 if (!expired) { 1416 listener.onMessage(message); 1417 } 1418 afterMessageIsConsumed(md, expired); 1419 } catch (RuntimeException e) { 1420 LOG.error("{} Exception while processing message: {}", getConsumerId(), md.getMessage().getMessageId(), e); 1421 if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) { 1422 // schedual redelivery and possible dlq processing 1423 md.setRollbackCause(e); 1424 rollback(); 1425 } else { 1426 // Transacted or Client ack: Deliver the 1427 // next message. 1428 afterMessageIsConsumed(md, false); 1429 } 1430 } 1431 } else { 1432 md.setDeliverySequenceId(-1); // skip duplicate check on subsequent queued delivery 1433 if (md.getMessage() == null) { 1434 // End of browse or pull request timeout. 1435 unconsumedMessages.enqueue(md); 1436 } else { 1437 if (!consumeExpiredMessage(md)) { 1438 unconsumedMessages.enqueue(md); 1439 if (availableListener != null) { 1440 availableListener.onMessageAvailable(this); 1441 } 1442 } else { 1443 beforeMessageIsConsumed(md); 1444 afterMessageIsConsumed(md, true); 1445 1446 // Pull consumer needs to check if pull timed out and send 1447 // a new pull command if not. 1448 if (info.getCurrentPrefetchSize() == 0) { 1449 unconsumedMessages.enqueue(null); 1450 } 1451 } 1452 } 1453 } 1454 } else { 1455 // deal with duplicate delivery 1456 ConsumerId consumerWithPendingTransaction; 1457 if (redeliveryExpectedInCurrentTransaction(md, true)) { 1458 LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage()); 1459 if (transactedIndividualAck) { 1460 immediateIndividualTransactedAck(md); 1461 } else { 1462 session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1)); 1463 } 1464 } else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) { 1465 LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(), md.getMessage(), consumerWithPendingTransaction); 1466 session.getConnection().rollbackDuplicate(this, md.getMessage()); 1467 dispatch(md); 1468 } else { 1469 LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md); 1470 posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId()); 1471 } 1472 } 1473 } 1474 } 1475 if (++dispatchedCount % 1000 == 0) { 1476 dispatchedCount = 0; 1477 Thread.yield(); 1478 } 1479 } catch (Exception e) { 1480 session.connection.onClientInternalException(e); 1481 } 1482 } 1483 1484 private boolean redeliveryExpectedInCurrentTransaction(MessageDispatch md, boolean markReceipt) { 1485 if (session.isTransacted()) { 1486 synchronized (deliveredMessages) { 1487 if (previouslyDeliveredMessages != null) { 1488 PreviouslyDelivered entry; 1489 if ((entry = previouslyDeliveredMessages.get(md.getMessage().getMessageId())) != null) { 1490 if (markReceipt) { 1491 entry.redelivered = true; 1492 } 1493 return true; 1494 } 1495 } 1496 } 1497 } 1498 return false; 1499 } 1500 1501 private ConsumerId redeliveryPendingInCompetingTransaction(MessageDispatch md) { 1502 for (ActiveMQSession activeMQSession: session.connection.getSessions()) { 1503 for (ActiveMQMessageConsumer activeMQMessageConsumer : activeMQSession.consumers) { 1504 if (activeMQMessageConsumer.redeliveryExpectedInCurrentTransaction(md, false)) { 1505 return activeMQMessageConsumer.getConsumerId(); 1506 } 1507 } 1508 } 1509 return null; 1510 } 1511 1512 // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again 1513 private void clearDeliveredList() { 1514 if (clearDeliveredList) { 1515 synchronized (deliveredMessages) { 1516 if (clearDeliveredList) { 1517 if (!deliveredMessages.isEmpty()) { 1518 if (session.isTransacted()) { 1519 captureDeliveredMessagesForDuplicateSuppression(); 1520 } else { 1521 if (session.isClientAcknowledge()) { 1522 LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size()); 1523 // allow redelivery 1524 if (!this.info.isBrowser()) { 1525 for (MessageDispatch md: deliveredMessages) { 1526 this.session.connection.rollbackDuplicate(this, md.getMessage()); 1527 } 1528 } 1529 } 1530 LOG.debug("{} clearing delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size()); 1531 deliveredMessages.clear(); 1532 pendingAck = null; 1533 } 1534 } 1535 clearDeliveredList = false; 1536 } 1537 } 1538 } 1539 } 1540 1541 // called with deliveredMessages locked 1542 private void captureDeliveredMessagesForDuplicateSuppression() { 1543 captureDeliveredMessagesForDuplicateSuppressionWithRequireRedelivery (true); 1544 } 1545 1546 private void captureDeliveredMessagesForDuplicateSuppressionWithRequireRedelivery(boolean requireRedelivery) { 1547 if (previouslyDeliveredMessages == null) { 1548 previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, PreviouslyDelivered>(session.getTransactionContext().getTransactionId()); 1549 } 1550 for (MessageDispatch delivered : deliveredMessages) { 1551 previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), new PreviouslyDelivered(delivered, !requireRedelivery)); 1552 } 1553 LOG.trace("{} tracking existing transacted {} delivered list({})", getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size()); 1554 } 1555 1556 public int getMessageSize() { 1557 return unconsumedMessages.size(); 1558 } 1559 1560 public void start() throws JMSException { 1561 if (unconsumedMessages.isClosed()) { 1562 return; 1563 } 1564 started.set(true); 1565 unconsumedMessages.start(); 1566 session.executor.wakeup(); 1567 } 1568 1569 public void stop() { 1570 started.set(false); 1571 unconsumedMessages.stop(); 1572 } 1573 1574 @Override 1575 public String toString() { 1576 return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get() 1577 + " }"; 1578 } 1579 1580 /** 1581 * Delivers a message to the message listener. 1582 * 1583 * @return 1584 * @throws JMSException 1585 */ 1586 public boolean iterate() { 1587 MessageListener listener = this.messageListener.get(); 1588 if (listener != null) { 1589 MessageDispatch md = unconsumedMessages.dequeueNoWait(); 1590 if (md != null) { 1591 dispatch(md); 1592 return true; 1593 } 1594 } 1595 return false; 1596 } 1597 1598 public boolean isInUse(ActiveMQTempDestination destination) { 1599 return info.getDestination().equals(destination); 1600 } 1601 1602 public long getLastDeliveredSequenceId() { 1603 return lastDeliveredSequenceId; 1604 } 1605 1606 public IOException getFailureError() { 1607 return failureError; 1608 } 1609 1610 public void setFailureError(IOException failureError) { 1611 this.failureError = failureError; 1612 } 1613 1614 /** 1615 * @return the optimizedAckScheduledAckInterval 1616 */ 1617 public long getOptimizedAckScheduledAckInterval() { 1618 return optimizedAckScheduledAckInterval; 1619 } 1620 1621 /** 1622 * @param optimizedAckScheduledAckInterval the optimizedAckScheduledAckInterval to set 1623 */ 1624 public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) throws JMSException { 1625 this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval; 1626 1627 if (this.optimizedAckTask != null) { 1628 try { 1629 this.session.connection.getScheduler().cancel(optimizedAckTask); 1630 } catch (JMSException e) { 1631 LOG.debug("Caught exception while cancelling old optimized ack task", e); 1632 throw e; 1633 } 1634 this.optimizedAckTask = null; 1635 } 1636 1637 // Should we periodically send out all outstanding acks. 1638 if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0) { 1639 this.optimizedAckTask = new Runnable() { 1640 1641 @Override 1642 public void run() { 1643 try { 1644 if (optimizeAcknowledge && !unconsumedMessages.isClosed()) { 1645 LOG.info("Consumer:{} is performing scheduled delivery of outstanding optimized Acks", info.getConsumerId()); 1646 deliverAcks(); 1647 } 1648 } catch (Exception e) { 1649 LOG.debug("Optimized Ack Task caught exception during ack", e); 1650 } 1651 } 1652 }; 1653 1654 try { 1655 this.session.connection.getScheduler().executePeriodically(optimizedAckTask, optimizedAckScheduledAckInterval); 1656 } catch (JMSException e) { 1657 LOG.debug("Caught exception while scheduling new optimized ack task", e); 1658 throw e; 1659 } 1660 } 1661 } 1662 1663 public boolean hasMessageListener() { 1664 return messageListener.get() != null; 1665 } 1666 1667 public boolean isConsumerExpiryCheckEnabled() { 1668 return consumerExpiryCheckEnabled; 1669 } 1670 1671 public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { 1672 this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; 1673 } 1674}