001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.Iterator;
024import java.util.LinkedList;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.atomic.AtomicReference;
033
034import javax.jms.IllegalStateException;
035import javax.jms.InvalidDestinationException;
036import javax.jms.JMSException;
037import javax.jms.Message;
038import javax.jms.MessageConsumer;
039import javax.jms.MessageListener;
040import javax.jms.TransactionRolledBackException;
041
042import org.apache.activemq.blob.BlobDownloader;
043import org.apache.activemq.command.*;
044import org.apache.activemq.management.JMSConsumerStatsImpl;
045import org.apache.activemq.management.StatsCapable;
046import org.apache.activemq.management.StatsImpl;
047import org.apache.activemq.selector.SelectorParser;
048import org.apache.activemq.transaction.Synchronization;
049import org.apache.activemq.util.Callback;
050import org.apache.activemq.util.IntrospectionSupport;
051import org.apache.activemq.util.JMSExceptionSupport;
052import org.apache.activemq.util.ThreadPoolUtils;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
058 * from a destination. A <CODE> MessageConsumer</CODE> object is created by
059 * passing a <CODE>Destination</CODE> object to a message-consumer creation
060 * method supplied by a session.
061 * <P>
062 * <CODE>MessageConsumer</CODE> is the parent interface for all message
063 * consumers.
064 * <P>
065 * A message consumer can be created with a message selector. A message selector
066 * allows the client to restrict the messages delivered to the message consumer
067 * to those that match the selector.
068 * <P>
069 * A client may either synchronously receive a message consumer's messages or
070 * have the consumer asynchronously deliver them as they arrive.
071 * <P>
072 * For synchronous receipt, a client can request the next message from a message
073 * consumer using one of its <CODE> receive</CODE> methods. There are several
074 * variations of <CODE>receive</CODE> that allow a client to poll or wait for
075 * the next message.
076 * <P>
077 * For asynchronous delivery, a client can register a
078 * <CODE>MessageListener</CODE> object with a message consumer. As messages
079 * arrive at the message consumer, it delivers them by calling the
080 * <CODE>MessageListener</CODE>'s<CODE>
081 * onMessage</CODE> method.
082 * <P>
083 * It is a client programming error for a <CODE>MessageListener</CODE> to
084 * throw an exception.
085 *
086 *
087 * @see javax.jms.MessageConsumer
088 * @see javax.jms.QueueReceiver
089 * @see javax.jms.TopicSubscriber
090 * @see javax.jms.Session
091 */
092public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
093
094    @SuppressWarnings("serial")
095    class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> {
096        final TransactionId transactionId;
097        public PreviouslyDeliveredMap(TransactionId transactionId) {
098            this.transactionId = transactionId;
099        }
100    }
101
102    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
103    protected final ActiveMQSession session;
104    protected final ConsumerInfo info;
105
106    // These are the messages waiting to be delivered to the client
107    protected final MessageDispatchChannel unconsumedMessages;
108
109    // The are the messages that were delivered to the consumer but that have
110    // not been acknowledged. It's kept in reverse order since we
111    // Always walk list in reverse order.
112    protected final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
113    // track duplicate deliveries in a transaction such that the tx integrity can be validated
114    private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
115    private int deliveredCounter;
116    private int additionalWindowSize;
117    private long redeliveryDelay;
118    private int ackCounter;
119    private int dispatchedCount;
120    private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
121    private final JMSConsumerStatsImpl stats;
122
123    private final String selector;
124    private boolean synchronizationRegistered;
125    private final AtomicBoolean started = new AtomicBoolean(false);
126
127    private MessageAvailableListener availableListener;
128
129    private RedeliveryPolicy redeliveryPolicy;
130    private boolean optimizeAcknowledge;
131    private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
132    private ExecutorService executorService;
133    private MessageTransformer transformer;
134    private boolean clearDeliveredList;
135    AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
136
137    private MessageAck pendingAck;
138    private long lastDeliveredSequenceId = -1;
139
140    private IOException failureError;
141
142    private long optimizeAckTimestamp = System.currentTimeMillis();
143    private long optimizeAcknowledgeTimeOut = 0;
144    private long optimizedAckScheduledAckInterval = 0;
145    private Runnable optimizedAckTask;
146    private long failoverRedeliveryWaitPeriod = 0;
147    private boolean transactedIndividualAck = false;
148    private boolean nonBlockingRedelivery = false;
149    private boolean consumerExpiryCheckEnabled = true;
150
151    /**
152     * Create a MessageConsumer
153     *
154     * @param session
155     * @param dest
156     * @param name
157     * @param selector
158     * @param prefetch
159     * @param maximumPendingMessageCount
160     * @param noLocal
161     * @param browser
162     * @param dispatchAsync
163     * @param messageListener
164     * @throws JMSException
165     */
166    public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
167            String name, String selector, int prefetch,
168            int maximumPendingMessageCount, boolean noLocal, boolean browser,
169            boolean dispatchAsync, MessageListener messageListener) throws JMSException {
170        if (dest == null) {
171            throw new InvalidDestinationException("Don't understand null destinations");
172        } else if (dest.getPhysicalName() == null) {
173            throw new InvalidDestinationException("The destination object was not given a physical name.");
174        } else if (dest.isTemporary()) {
175            String physicalName = dest.getPhysicalName();
176
177            if (physicalName == null) {
178                throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
179            }
180
181            String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
182
183            if (physicalName.indexOf(connectionID) < 0) {
184                throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
185            }
186
187            if (session.connection.isDeleted(dest)) {
188                throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
189            }
190            if (prefetch < 0) {
191                throw new JMSException("Cannot have a prefetch size less than zero");
192            }
193        }
194        if (session.connection.isMessagePrioritySupported()) {
195            this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
196        }else {
197            this.unconsumedMessages = new FifoMessageDispatchChannel();
198        }
199
200        this.session = session;
201        this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
202        if (this.redeliveryPolicy == null) {
203            this.redeliveryPolicy = new RedeliveryPolicy();
204        }
205        setTransformer(session.getTransformer());
206
207        this.info = new ConsumerInfo(consumerId);
208        this.info.setExclusive(this.session.connection.isExclusiveConsumer());
209        this.info.setClientId(this.session.connection.getClientID());
210        this.info.setSubscriptionName(name);
211        this.info.setPrefetchSize(prefetch);
212        this.info.setCurrentPrefetchSize(prefetch);
213        this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
214        this.info.setNoLocal(noLocal);
215        this.info.setDispatchAsync(dispatchAsync);
216        this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
217        this.info.setSelector(null);
218
219        // Allows the options on the destination to configure the consumerInfo
220        if (dest.getOptions() != null) {
221            Map<String, Object> options = IntrospectionSupport.extractProperties(
222                new HashMap<String, Object>(dest.getOptions()), "consumer.");
223            IntrospectionSupport.setProperties(this.info, options);
224            if (options.size() > 0) {
225                String msg = "There are " + options.size()
226                    + " consumer options that couldn't be set on the consumer."
227                    + " Check the options are spelled correctly."
228                    + " Unknown parameters=[" + options + "]."
229                    + " This consumer cannot be started.";
230                LOG.warn(msg);
231                throw new ConfigurationException(msg);
232            }
233        }
234
235        this.info.setDestination(dest);
236        this.info.setBrowser(browser);
237        if (selector != null && selector.trim().length() != 0) {
238            // Validate the selector
239            SelectorParser.parse(selector);
240            this.info.setSelector(selector);
241            this.selector = selector;
242        } else if (info.getSelector() != null) {
243            // Validate the selector
244            SelectorParser.parse(this.info.getSelector());
245            this.selector = this.info.getSelector();
246        } else {
247            this.selector = null;
248        }
249
250        this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
251        this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
252                                   && !info.isBrowser();
253        if (this.optimizeAcknowledge) {
254            this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
255            setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval());
256        }
257
258        this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
259        this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
260        this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
261        this.transactedIndividualAck = session.connection.isTransactedIndividualAck()
262                        || this.nonBlockingRedelivery
263                        || session.connection.isMessagePrioritySupported();
264        this.consumerExpiryCheckEnabled = session.connection.isConsumerExpiryCheckEnabled();
265        if (messageListener != null) {
266            setMessageListener(messageListener);
267        }
268        try {
269            this.session.addConsumer(this);
270            this.session.syncSendPacket(info);
271        } catch (JMSException e) {
272            this.session.removeConsumer(this);
273            throw e;
274        }
275
276        if (session.connection.isStarted()) {
277            start();
278        }
279    }
280
281    private boolean isAutoAcknowledgeEach() {
282        return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() );
283    }
284
285    private boolean isAutoAcknowledgeBatch() {
286        return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
287    }
288
289    @Override
290    public StatsImpl getStats() {
291        return stats;
292    }
293
294    public JMSConsumerStatsImpl getConsumerStats() {
295        return stats;
296    }
297
298    public RedeliveryPolicy getRedeliveryPolicy() {
299        return redeliveryPolicy;
300    }
301
302    /**
303     * Sets the redelivery policy used when messages are redelivered
304     */
305    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
306        this.redeliveryPolicy = redeliveryPolicy;
307    }
308
309    public MessageTransformer getTransformer() {
310        return transformer;
311    }
312
313    /**
314     * Sets the transformer used to transform messages before they are sent on
315     * to the JMS bus
316     */
317    public void setTransformer(MessageTransformer transformer) {
318        this.transformer = transformer;
319    }
320
321    /**
322     * @return Returns the value.
323     */
324    public ConsumerId getConsumerId() {
325        return info.getConsumerId();
326    }
327
328    /**
329     * @return the consumer name - used for durable consumers
330     */
331    public String getConsumerName() {
332        return this.info.getSubscriptionName();
333    }
334
335    /**
336     * @return true if this consumer does not accept locally produced messages
337     */
338    protected boolean isNoLocal() {
339        return info.isNoLocal();
340    }
341
342    /**
343     * Retrieve is a browser
344     *
345     * @return true if a browser
346     */
347    protected boolean isBrowser() {
348        return info.isBrowser();
349    }
350
351    /**
352     * @return ActiveMQDestination
353     */
354    protected ActiveMQDestination getDestination() {
355        return info.getDestination();
356    }
357
358    /**
359     * @return Returns the prefetchNumber.
360     */
361    public int getPrefetchNumber() {
362        return info.getPrefetchSize();
363    }
364
365    /**
366     * @return true if this is a durable topic subscriber
367     */
368    public boolean isDurableSubscriber() {
369        return info.getSubscriptionName() != null && info.getDestination().isTopic();
370    }
371
372    /**
373     * Gets this message consumer's message selector expression.
374     *
375     * @return this message consumer's message selector, or null if no message
376     *         selector exists for the message consumer (that is, if the message
377     *         selector was not set or was set to null or the empty string)
378     * @throws JMSException if the JMS provider fails to receive the next
379     *                 message due to some internal error.
380     */
381    @Override
382    public String getMessageSelector() throws JMSException {
383        checkClosed();
384        return selector;
385    }
386
387    /**
388     * Gets the message consumer's <CODE>MessageListener</CODE>.
389     *
390     * @return the listener for the message consumer, or null if no listener is
391     *         set
392     * @throws JMSException if the JMS provider fails to get the message
393     *                 listener due to some internal error.
394     * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
395     */
396    @Override
397    public MessageListener getMessageListener() throws JMSException {
398        checkClosed();
399        return this.messageListener.get();
400    }
401
402    /**
403     * Sets the message consumer's <CODE>MessageListener</CODE>.
404     * <P>
405     * Setting the message listener to null is the equivalent of unsetting the
406     * message listener for the message consumer.
407     * <P>
408     * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
409     * while messages are being consumed by an existing listener or the consumer
410     * is being used to consume messages synchronously is undefined.
411     *
412     * @param listener the listener to which the messages are to be delivered
413     * @throws JMSException if the JMS provider fails to receive the next
414     *                 message due to some internal error.
415     * @see javax.jms.MessageConsumer#getMessageListener
416     */
417    @Override
418    public void setMessageListener(MessageListener listener) throws JMSException {
419        checkClosed();
420        if (info.getPrefetchSize() == 0) {
421            throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
422        }
423        if (listener != null) {
424            boolean wasRunning = session.isRunning();
425            if (wasRunning) {
426                session.stop();
427            }
428
429            this.messageListener.set(listener);
430            session.redispatch(this, unconsumedMessages);
431
432            if (wasRunning) {
433                session.start();
434            }
435        } else {
436            this.messageListener.set(null);
437        }
438    }
439
440    @Override
441    public MessageAvailableListener getAvailableListener() {
442        return availableListener;
443    }
444
445    /**
446     * Sets the listener used to notify synchronous consumers that there is a
447     * message available so that the {@link MessageConsumer#receiveNoWait()} can
448     * be called.
449     */
450    @Override
451    public void setAvailableListener(MessageAvailableListener availableListener) {
452        this.availableListener = availableListener;
453    }
454
455    /**
456     * Used to get an enqueued message from the unconsumedMessages list. The
457     * amount of time this method blocks is based on the timeout value. - if
458     * timeout==-1 then it blocks until a message is received. - if timeout==0
459     * then it it tries to not block at all, it returns a message if it is
460     * available - if timeout>0 then it blocks up to timeout amount of time.
461     * Expired messages will consumed by this method.
462     *
463     * @throws JMSException
464     * @return null if we timeout or if the consumer is closed.
465     */
466    private MessageDispatch dequeue(long timeout) throws JMSException {
467        try {
468            long deadline = 0;
469            if (timeout > 0) {
470                deadline = System.currentTimeMillis() + timeout;
471            }
472            while (true) {
473                MessageDispatch md = unconsumedMessages.dequeue(timeout);
474                if (md == null) {
475                    if (timeout > 0 && !unconsumedMessages.isClosed()) {
476                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
477                    } else {
478                        if (failureError != null) {
479                            throw JMSExceptionSupport.create(failureError);
480                        } else {
481                            return null;
482                        }
483                    }
484                } else if (md.getMessage() == null) {
485                    return null;
486                } else if (consumeExpiredMessage(md)) {
487                    LOG.debug("{} received expired message: {}", getConsumerId(), md);
488                    beforeMessageIsConsumed(md);
489                    afterMessageIsConsumed(md, true);
490                    if (timeout > 0) {
491                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
492                    }
493                    sendPullCommand(timeout);
494                } else if (redeliveryExceeded(md)) {
495                    LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
496                    posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
497                    if (timeout > 0) {
498                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
499                    }
500                    sendPullCommand(timeout);
501                } else {
502                    if (LOG.isTraceEnabled()) {
503                        LOG.trace(getConsumerId() + " received message: " + md);
504                    }
505                    return md;
506                }
507            }
508        } catch (InterruptedException e) {
509            Thread.currentThread().interrupt();
510            throw JMSExceptionSupport.create(e);
511        }
512    }
513
514    private boolean consumeExpiredMessage(MessageDispatch dispatch) {
515        return isConsumerExpiryCheckEnabled() && dispatch.getMessage().isExpired();
516    }
517
518    private void posionAck(MessageDispatch md, String cause) throws JMSException {
519        MessageAck posionAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
520        posionAck.setFirstMessageId(md.getMessage().getMessageId());
521        posionAck.setPoisonCause(new Throwable(cause));
522        session.sendAck(posionAck);
523    }
524
525    private boolean redeliveryExceeded(MessageDispatch md) {
526        try {
527            return session.getTransacted()
528                    && redeliveryPolicy != null
529                    && redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
530                    && md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()
531                    // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
532                    && md.getMessage().getProperty("redeliveryDelay") == null;
533        } catch (Exception ignored) {
534            return false;
535        }
536    }
537
538    /**
539     * Receives the next message produced for this message consumer.
540     * <P>
541     * This call blocks indefinitely until a message is produced or until this
542     * message consumer is closed.
543     * <P>
544     * If this <CODE>receive</CODE> is done within a transaction, the consumer
545     * retains the message until the transaction commits.
546     *
547     * @return the next message produced for this message consumer, or null if
548     *         this message consumer is concurrently closed
549     */
550    @Override
551    public Message receive() throws JMSException {
552        checkClosed();
553        checkMessageListener();
554
555        sendPullCommand(0);
556        MessageDispatch md = dequeue(-1);
557        if (md == null) {
558            return null;
559        }
560
561        beforeMessageIsConsumed(md);
562        afterMessageIsConsumed(md, false);
563
564        return createActiveMQMessage(md);
565    }
566
567    /**
568     * @param md
569     * @return
570     */
571    private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
572        ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
573        if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
574            ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
575        }
576        if (m.getDataStructureType() == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) {
577            ((ActiveMQObjectMessage)m).setTrustAllPackages(session.getConnection().isTrustAllPackages());
578            ((ActiveMQObjectMessage)m).setTrustedPackages(session.getConnection().getTrustedPackages());
579        }
580        if (transformer != null) {
581            Message transformedMessage = transformer.consumerTransform(session, this, m);
582            if (transformedMessage != null) {
583                m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
584            }
585        }
586        if (session.isClientAcknowledge()) {
587            m.setAcknowledgeCallback(new Callback() {
588                @Override
589                public void execute() throws Exception {
590                    session.checkClosed();
591                    session.acknowledge();
592                }
593            });
594        } else if (session.isIndividualAcknowledge()) {
595            m.setAcknowledgeCallback(new Callback() {
596                @Override
597                public void execute() throws Exception {
598                    session.checkClosed();
599                    acknowledge(md);
600                }
601            });
602        }
603        return m;
604    }
605
606    /**
607     * Receives the next message that arrives within the specified timeout
608     * interval.
609     * <P>
610     * This call blocks until a message arrives, the timeout expires, or this
611     * message consumer is closed. A <CODE>timeout</CODE> of zero never
612     * expires, and the call blocks indefinitely.
613     *
614     * @param timeout the timeout value (in milliseconds), a time out of zero
615     *                never expires.
616     * @return the next message produced for this message consumer, or null if
617     *         the timeout expires or this message consumer is concurrently
618     *         closed
619     */
620    @Override
621    public Message receive(long timeout) throws JMSException {
622        checkClosed();
623        checkMessageListener();
624        if (timeout == 0) {
625            return this.receive();
626        }
627
628        sendPullCommand(timeout);
629        while (timeout > 0) {
630
631            MessageDispatch md;
632            if (info.getPrefetchSize() == 0) {
633                md = dequeue(-1); // We let the broker let us know when we timeout.
634            } else {
635                md = dequeue(timeout);
636            }
637
638            if (md == null) {
639                return null;
640            }
641
642            beforeMessageIsConsumed(md);
643            afterMessageIsConsumed(md, false);
644            return createActiveMQMessage(md);
645        }
646        return null;
647    }
648
649    /**
650     * Receives the next message if one is immediately available.
651     *
652     * @return the next message produced for this message consumer, or null if
653     *         one is not available
654     * @throws JMSException if the JMS provider fails to receive the next
655     *                 message due to some internal error.
656     */
657    @Override
658    public Message receiveNoWait() throws JMSException {
659        checkClosed();
660        checkMessageListener();
661        sendPullCommand(-1);
662
663        MessageDispatch md;
664        if (info.getPrefetchSize() == 0) {
665            md = dequeue(-1); // We let the broker let us know when we
666            // timeout.
667        } else {
668            md = dequeue(0);
669        }
670
671        if (md == null) {
672            return null;
673        }
674
675        beforeMessageIsConsumed(md);
676        afterMessageIsConsumed(md, false);
677        return createActiveMQMessage(md);
678    }
679
680    /**
681     * Closes the message consumer.
682     * <P>
683     * Since a provider may allocate some resources on behalf of a <CODE>
684     * MessageConsumer</CODE>
685     * outside the Java virtual machine, clients should close them when they are
686     * not needed. Relying on garbage collection to eventually reclaim these
687     * resources may not be timely enough.
688     * <P>
689     * This call blocks until a <CODE>receive</CODE> or message listener in
690     * progress has completed. A blocked message consumer <CODE>receive </CODE>
691     * call returns null when this message consumer is closed.
692     *
693     * @throws JMSException if the JMS provider fails to close the consumer due
694     *                 to some internal error.
695     */
696    @Override
697    public void close() throws JMSException {
698        if (!unconsumedMessages.isClosed()) {
699            if (!deliveredMessages.isEmpty() && session.getTransactionContext().isInTransaction()) {
700                session.getTransactionContext().addSynchronization(new Synchronization() {
701                    @Override
702                    public void afterCommit() throws Exception {
703                        doClose();
704                    }
705
706                    @Override
707                    public void afterRollback() throws Exception {
708                        doClose();
709                    }
710                });
711            } else {
712                doClose();
713            }
714        }
715    }
716
717    void doClose() throws JMSException {
718        dispose();
719        RemoveInfo removeCommand = info.createRemoveCommand();
720        LOG.debug("remove: {}, lastDeliveredSequenceId: {}", getConsumerId(), lastDeliveredSequenceId);
721        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
722        this.session.asyncSendPacket(removeCommand);
723    }
724
725    void inProgressClearRequired() {
726        inProgressClearRequiredFlag.incrementAndGet();
727        // deal with delivered messages async to avoid lock contention with in progress acks
728        clearDeliveredList = true;
729        // force a rollback if we will be acking in a transaction after/during failover
730        // bc acks are async they may not get there reliably on reconnect and the consumer
731        // may not be aware of the reconnect in a timely fashion if in onMessage
732        if (!deliveredMessages.isEmpty() && session.getTransactionContext().isInTransaction()) {
733            session.getTransactionContext().setRollbackOnly(true);
734        }
735    }
736
737    void clearMessagesInProgress() {
738        if (inProgressClearRequiredFlag.get() > 0) {
739            synchronized (unconsumedMessages.getMutex()) {
740                if (inProgressClearRequiredFlag.get() > 0) {
741                    LOG.debug("{} clearing unconsumed list ({}) on transport interrupt", getConsumerId(), unconsumedMessages.size());
742                    // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
743                    List<MessageDispatch> list = unconsumedMessages.removeAll();
744                    if (!this.info.isBrowser()) {
745                        for (MessageDispatch old : list) {
746                            session.connection.rollbackDuplicate(this, old.getMessage());
747                        }
748                    }
749                    // allow dispatch on this connection to resume
750                    session.connection.transportInterruptionProcessingComplete();
751                    inProgressClearRequiredFlag.decrementAndGet();
752
753                    // Wake up any blockers and allow them to recheck state.
754                    unconsumedMessages.getMutex().notifyAll();
755                }
756            }
757        }
758        clearDeliveredList();
759    }
760
761    void deliverAcks() {
762        MessageAck ack = null;
763        if (deliveryingAcknowledgements.compareAndSet(false, true)) {
764            if (isAutoAcknowledgeEach()) {
765                synchronized(deliveredMessages) {
766                    ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
767                    if (ack != null) {
768                        deliveredMessages.clear();
769                        ackCounter = 0;
770                    } else {
771                        ack = pendingAck;
772                        pendingAck = null;
773                    }
774                }
775            } else if (pendingAck != null && pendingAck.isStandardAck()) {
776                ack = pendingAck;
777                pendingAck = null;
778            }
779            if (ack != null) {
780                final MessageAck ackToSend = ack;
781
782                if (executorService == null) {
783                    executorService = Executors.newSingleThreadExecutor();
784                }
785                executorService.submit(new Runnable() {
786                    @Override
787                    public void run() {
788                        try {
789                            session.sendAck(ackToSend,true);
790                        } catch (JMSException e) {
791                            LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
792                        } finally {
793                            deliveryingAcknowledgements.set(false);
794                        }
795                    }
796                });
797            } else {
798                deliveryingAcknowledgements.set(false);
799            }
800        }
801    }
802
803    public void dispose() throws JMSException {
804        if (!unconsumedMessages.isClosed()) {
805
806            // Do we have any acks we need to send out before closing?
807            // Ack any delivered messages now.
808            if (!session.getTransacted()) {
809                deliverAcks();
810                if (isAutoAcknowledgeBatch()) {
811                    acknowledge();
812                }
813            }
814            if (executorService != null) {
815                ThreadPoolUtils.shutdownGraceful(executorService, 60000L);
816                executorService = null;
817            }
818            if (optimizedAckTask != null) {
819                this.session.connection.getScheduler().cancel(optimizedAckTask);
820                optimizedAckTask = null;
821            }
822
823            if (session.isClientAcknowledge()) {
824                if (!this.info.isBrowser()) {
825                    // rollback duplicates that aren't acknowledged
826                    List<MessageDispatch> tmp = null;
827                    synchronized (this.deliveredMessages) {
828                        tmp = new ArrayList<MessageDispatch>(this.deliveredMessages);
829                    }
830                    for (MessageDispatch old : tmp) {
831                        this.session.connection.rollbackDuplicate(this, old.getMessage());
832                    }
833                    tmp.clear();
834                }
835            }
836            if (!session.isTransacted()) {
837                synchronized(deliveredMessages) {
838                    deliveredMessages.clear();
839                }
840            }
841            unconsumedMessages.close();
842            this.session.removeConsumer(this);
843            List<MessageDispatch> list = unconsumedMessages.removeAll();
844            if (!this.info.isBrowser()) {
845                for (MessageDispatch old : list) {
846                    // ensure we don't filter this as a duplicate
847                    LOG.debug("on close, rollback duplicate: {}", old.getMessage().getMessageId());
848                    session.connection.rollbackDuplicate(this, old.getMessage());
849                }
850            }
851        }
852    }
853
854    /**
855     * @throws IllegalStateException
856     */
857    protected void checkClosed() throws IllegalStateException {
858        if (unconsumedMessages.isClosed()) {
859            throw new IllegalStateException("The Consumer is closed");
860        }
861    }
862
863    /**
864     * If we have a zero prefetch specified then send a pull command to the
865     * broker to pull a message we are about to receive
866     */
867    protected void sendPullCommand(long timeout) throws JMSException {
868        clearDeliveredList();
869        if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
870            MessagePull messagePull = new MessagePull();
871            messagePull.configure(info);
872            messagePull.setTimeout(timeout);
873            session.asyncSendPacket(messagePull);
874        }
875    }
876
877    protected void checkMessageListener() throws JMSException {
878        session.checkMessageListener();
879    }
880
881    protected void setOptimizeAcknowledge(boolean value) {
882        if (optimizeAcknowledge && !value) {
883            deliverAcks();
884        }
885        optimizeAcknowledge = value;
886    }
887
888    protected void setPrefetchSize(int prefetch) {
889        deliverAcks();
890        this.info.setCurrentPrefetchSize(prefetch);
891    }
892
893    private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
894        md.setDeliverySequenceId(session.getNextDeliveryId());
895        lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
896        if (!isAutoAcknowledgeBatch()) {
897            synchronized(deliveredMessages) {
898                deliveredMessages.addFirst(md);
899            }
900            if (session.getTransacted()) {
901                if (transactedIndividualAck) {
902                    immediateIndividualTransactedAck(md);
903                } else {
904                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
905                }
906            }
907        }
908    }
909
910    private void immediateIndividualTransactedAck(MessageDispatch md) throws JMSException {
911        // acks accumulate on the broker pending transaction completion to indicate
912        // delivery status
913        registerSync();
914        MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
915        ack.setTransactionId(session.getTransactionContext().getTransactionId());
916        session.sendAck(ack);
917    }
918
919    private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
920        if (unconsumedMessages.isClosed()) {
921            return;
922        }
923        if (messageExpired) {
924            acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);
925            stats.getExpiredMessageCount().increment();
926        } else {
927            stats.onMessage();
928            if (session.getTransacted()) {
929                // Do nothing.
930            } else if (isAutoAcknowledgeEach()) {
931                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
932                    synchronized (deliveredMessages) {
933                        if (!deliveredMessages.isEmpty()) {
934                            if (optimizeAcknowledge) {
935                                ackCounter++;
936
937                                // AMQ-3956 evaluate both expired and normal msgs as
938                                // otherwise consumer may get stalled
939                                if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
940                                    MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
941                                    if (ack != null) {
942                                        deliveredMessages.clear();
943                                        ackCounter = 0;
944                                        session.sendAck(ack);
945                                        optimizeAckTimestamp = System.currentTimeMillis();
946                                    }
947                                    // AMQ-3956 - as further optimization send
948                                    // ack for expired msgs when there are any.
949                                    // This resets the deliveredCounter to 0 so that
950                                    // we won't sent standard acks with every msg just
951                                    // because the deliveredCounter just below
952                                    // 0.5 * prefetch as used in ackLater()
953                                    if (pendingAck != null && deliveredCounter > 0) {
954                                        session.sendAck(pendingAck);
955                                        pendingAck = null;
956                                        deliveredCounter = 0;
957                                    }
958                                }
959                            } else {
960                                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
961                                if (ack!=null) {
962                                    deliveredMessages.clear();
963                                    session.sendAck(ack);
964                                }
965                            }
966                        }
967                    }
968                    deliveryingAcknowledgements.set(false);
969                }
970            } else if (isAutoAcknowledgeBatch()) {
971                ackLater(md, MessageAck.STANDARD_ACK_TYPE);
972            } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
973                boolean messageUnackedByConsumer = false;
974                synchronized (deliveredMessages) {
975                    messageUnackedByConsumer = deliveredMessages.contains(md);
976                }
977                if (messageUnackedByConsumer) {
978                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
979                }
980            }
981            else {
982                throw new IllegalStateException("Invalid session state.");
983            }
984        }
985    }
986
987    /**
988     * Creates a MessageAck for all messages contained in deliveredMessages.
989     * Caller should hold the lock for deliveredMessages.
990     *
991     * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE)
992     * @return <code>null</code> if nothing to ack.
993     */
994    private MessageAck makeAckForAllDeliveredMessages(byte type) {
995        synchronized (deliveredMessages) {
996            if (deliveredMessages.isEmpty()) {
997                return null;
998            }
999
1000            MessageDispatch md = deliveredMessages.getFirst();
1001            MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
1002            ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
1003            return ack;
1004        }
1005    }
1006
1007    private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
1008
1009        // Don't acknowledge now, but we may need to let the broker know the
1010        // consumer got the message to expand the pre-fetch window
1011        if (session.getTransacted()) {
1012            registerSync();
1013        }
1014
1015        deliveredCounter++;
1016
1017        MessageAck oldPendingAck = pendingAck;
1018        pendingAck = new MessageAck(md, ackType, deliveredCounter);
1019        pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
1020        if( oldPendingAck==null ) {
1021            pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
1022        } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
1023            pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
1024        } else {
1025            // old pending ack being superseded by ack of another type, if is is not a delivered
1026            // ack and hence important, send it now so it is not lost.
1027            if (!oldPendingAck.isDeliveredAck()) {
1028                LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, pendingAck);
1029                session.sendAck(oldPendingAck);
1030            } else {
1031                LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, pendingAck);
1032            }
1033        }
1034        // AMQ-3956 evaluate both expired and normal msgs as
1035        // otherwise consumer may get stalled
1036        if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) {
1037            LOG.debug("ackLater: sending: {}", pendingAck);
1038            session.sendAck(pendingAck);
1039            pendingAck=null;
1040            deliveredCounter = 0;
1041            additionalWindowSize = 0;
1042        }
1043    }
1044
1045    private void registerSync() throws JMSException {
1046        session.doStartTransaction();
1047        if (!synchronizationRegistered) {
1048            synchronizationRegistered = true;
1049            session.getTransactionContext().addSynchronization(new Synchronization() {
1050                @Override
1051                public void beforeEnd() throws Exception {
1052                    if (transactedIndividualAck) {
1053                        clearDeliveredList();
1054                        waitForRedeliveries();
1055                        synchronized(deliveredMessages) {
1056                            rollbackOnFailedRecoveryRedelivery();
1057                        }
1058                    } else {
1059                        acknowledge();
1060                    }
1061                    synchronizationRegistered = false;
1062                }
1063
1064                @Override
1065                public void afterCommit() throws Exception {
1066                    commit();
1067                    synchronizationRegistered = false;
1068                }
1069
1070                @Override
1071                public void afterRollback() throws Exception {
1072                    rollback();
1073                    synchronizationRegistered = false;
1074                }
1075            });
1076        }
1077    }
1078
1079    /**
1080     * Acknowledge all the messages that have been delivered to the client up to
1081     * this point.
1082     *
1083     * @throws JMSException
1084     */
1085    public void acknowledge() throws JMSException {
1086        clearDeliveredList();
1087        waitForRedeliveries();
1088        synchronized(deliveredMessages) {
1089            // Acknowledge all messages so far.
1090            MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
1091            if (ack == null) {
1092                return; // no msgs
1093            }
1094
1095            if (session.getTransacted()) {
1096                rollbackOnFailedRecoveryRedelivery();
1097                session.doStartTransaction();
1098                ack.setTransactionId(session.getTransactionContext().getTransactionId());
1099            }
1100
1101            pendingAck = null;
1102            session.sendAck(ack);
1103
1104            // Adjust the counters
1105            deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
1106            additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1107
1108            if (!session.getTransacted()) {
1109                deliveredMessages.clear();
1110            }
1111        }
1112    }
1113
1114    private void waitForRedeliveries() {
1115        if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) {
1116            long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod;
1117            int numberNotReplayed;
1118            do {
1119                numberNotReplayed = 0;
1120                synchronized(deliveredMessages) {
1121                    if (previouslyDeliveredMessages != null) {
1122                        for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1123                            if (!entry.getValue()) {
1124                                numberNotReplayed++;
1125                            }
1126                        }
1127                    }
1128                }
1129                if (numberNotReplayed > 0) {
1130                    LOG.info("waiting for redelivery of {} in transaction: {}, to consumer: {}",
1131                             numberNotReplayed, this.getConsumerId(), previouslyDeliveredMessages.transactionId);
1132                    try {
1133                        Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
1134                    } catch (InterruptedException outOfhere) {
1135                        break;
1136                    }
1137                }
1138            } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
1139        }
1140    }
1141
1142    /*
1143     * called with deliveredMessages locked
1144     */
1145    private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
1146        if (previouslyDeliveredMessages != null) {
1147            // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback
1148            // as messages have been dispatched else where.
1149            int numberNotReplayed = 0;
1150            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1151                if (!entry.getValue()) {
1152                    numberNotReplayed++;
1153                    LOG.debug("previously delivered message has not been replayed in transaction: {}, messageId: {}",
1154                              previouslyDeliveredMessages.transactionId, entry.getKey());
1155                }
1156            }
1157            if (numberNotReplayed > 0) {
1158                String message = "rolling back transaction ("
1159                    + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed
1160                    + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
1161                LOG.warn(message);
1162                throw new TransactionRolledBackException(message);
1163            }
1164        }
1165    }
1166
1167    void acknowledge(MessageDispatch md) throws JMSException {
1168        acknowledge(md, MessageAck.INDIVIDUAL_ACK_TYPE);
1169    }
1170
1171    void acknowledge(MessageDispatch md, byte ackType) throws JMSException {
1172        MessageAck ack = new MessageAck(md, ackType, 1);
1173        if (ack.isExpiredAck()) {
1174            ack.setFirstMessageId(ack.getLastMessageId());
1175        }
1176        session.sendAck(ack);
1177        synchronized(deliveredMessages){
1178            deliveredMessages.remove(md);
1179        }
1180    }
1181
1182    public void commit() throws JMSException {
1183        synchronized (deliveredMessages) {
1184            deliveredMessages.clear();
1185            clearPreviouslyDelivered();
1186        }
1187        redeliveryDelay = 0;
1188    }
1189
1190    public void rollback() throws JMSException {
1191        clearDeliveredList();
1192        synchronized (unconsumedMessages.getMutex()) {
1193            if (optimizeAcknowledge) {
1194                // remove messages read but not acked at the broker yet through
1195                // optimizeAcknowledge
1196                if (!this.info.isBrowser()) {
1197                    synchronized(deliveredMessages) {
1198                        for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
1199                            // ensure we don't filter this as a duplicate
1200                            MessageDispatch md = deliveredMessages.removeLast();
1201                            session.connection.rollbackDuplicate(this, md.getMessage());
1202                        }
1203                    }
1204                }
1205            }
1206            synchronized(deliveredMessages) {
1207                rollbackPreviouslyDeliveredAndNotRedelivered();
1208                if (deliveredMessages.isEmpty()) {
1209                    return;
1210                }
1211
1212                // use initial delay for first redelivery
1213                MessageDispatch lastMd = deliveredMessages.getFirst();
1214                final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
1215                if (currentRedeliveryCount > 0) {
1216                    redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
1217                } else {
1218                    redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
1219                }
1220                MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
1221
1222                for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1223                    MessageDispatch md = iter.next();
1224                    md.getMessage().onMessageRolledBack();
1225                    // ensure we don't filter this as a duplicate
1226                    session.connection.rollbackDuplicate(this, md.getMessage());
1227                }
1228
1229                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
1230                    && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
1231                    // We need to NACK the messages so that they get sent to the
1232                    // DLQ.
1233                    // Acknowledge the last message.
1234
1235                    MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
1236                    ack.setFirstMessageId(firstMsgId);
1237                    ack.setPoisonCause(new Throwable("Exceeded redelivery policy limit:" + redeliveryPolicy
1238                            + ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
1239                    session.sendAck(ack,true);
1240                    // Adjust the window size.
1241                    additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1242                    redeliveryDelay = 0;
1243
1244                    deliveredCounter -= deliveredMessages.size();
1245                    deliveredMessages.clear();
1246
1247                } else {
1248
1249                    // only redelivery_ack after first delivery
1250                    if (currentRedeliveryCount > 0) {
1251                        MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
1252                        ack.setFirstMessageId(firstMsgId);
1253                        session.sendAck(ack,true);
1254                    }
1255
1256                    // stop the delivery of messages.
1257                    if (nonBlockingRedelivery) {
1258                        if (!unconsumedMessages.isClosed()) {
1259
1260                            final LinkedList<MessageDispatch> pendingRedeliveries =
1261                                new LinkedList<MessageDispatch>(deliveredMessages);
1262
1263                            Collections.reverse(pendingRedeliveries);
1264
1265                            deliveredCounter -= deliveredMessages.size();
1266                            deliveredMessages.clear();
1267
1268                            // Start up the delivery again a little later.
1269                            session.getScheduler().executeAfterDelay(new Runnable() {
1270                                @Override
1271                                public void run() {
1272                                    try {
1273                                        if (!unconsumedMessages.isClosed()) {
1274                                            for(MessageDispatch dispatch : pendingRedeliveries) {
1275                                                session.dispatch(dispatch);
1276                                            }
1277                                        }
1278                                    } catch (Exception e) {
1279                                        session.connection.onAsyncException(e);
1280                                    }
1281                                }
1282                            }, redeliveryDelay);
1283                        }
1284
1285                    } else {
1286                        unconsumedMessages.stop();
1287
1288                        for (MessageDispatch md : deliveredMessages) {
1289                            unconsumedMessages.enqueueFirst(md);
1290                        }
1291
1292                        deliveredCounter -= deliveredMessages.size();
1293                        deliveredMessages.clear();
1294
1295                        if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
1296                            // Start up the delivery again a little later.
1297                            session.getScheduler().executeAfterDelay(new Runnable() {
1298                                @Override
1299                                public void run() {
1300                                    try {
1301                                        if (started.get()) {
1302                                            start();
1303                                        }
1304                                    } catch (JMSException e) {
1305                                        session.connection.onAsyncException(e);
1306                                    }
1307                                }
1308                            }, redeliveryDelay);
1309                        } else {
1310                            start();
1311                        }
1312                    }
1313                }
1314            }
1315        }
1316        if (messageListener.get() != null) {
1317            session.redispatch(this, unconsumedMessages);
1318        }
1319    }
1320
1321    /*
1322     * called with unconsumedMessages && deliveredMessages locked
1323     * remove any message not re-delivered as they can't be replayed to this
1324     * consumer on rollback
1325     */
1326    private void rollbackPreviouslyDeliveredAndNotRedelivered() {
1327        if (previouslyDeliveredMessages != null) {
1328            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1329                if (!entry.getValue()) {
1330                    LOG.trace("rollback non redelivered: {}" + entry.getKey());
1331                    removeFromDeliveredMessages(entry.getKey());
1332                }
1333            }
1334            clearPreviouslyDelivered();
1335        }
1336    }
1337
1338    /*
1339     * called with deliveredMessages locked
1340     */
1341    private void removeFromDeliveredMessages(MessageId key) {
1342        Iterator<MessageDispatch> iterator = deliveredMessages.iterator();
1343        while (iterator.hasNext()) {
1344            MessageDispatch candidate = iterator.next();
1345            if (key.equals(candidate.getMessage().getMessageId())) {
1346                session.connection.rollbackDuplicate(this, candidate.getMessage());
1347                iterator.remove();
1348                break;
1349            }
1350        }
1351    }
1352
1353    /*
1354     * called with deliveredMessages locked
1355     */
1356    private void clearPreviouslyDelivered() {
1357        if (previouslyDeliveredMessages != null) {
1358            previouslyDeliveredMessages.clear();
1359            previouslyDeliveredMessages = null;
1360        }
1361    }
1362
1363    @Override
1364    public void dispatch(MessageDispatch md) {
1365        MessageListener listener = this.messageListener.get();
1366        try {
1367            clearMessagesInProgress();
1368            clearDeliveredList();
1369            synchronized (unconsumedMessages.getMutex()) {
1370                if (!unconsumedMessages.isClosed()) {
1371                    if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
1372                        if (listener != null && unconsumedMessages.isRunning()) {
1373                            if (redeliveryExceeded(md)) {
1374                                posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
1375                                return;
1376                            }
1377                            ActiveMQMessage message = createActiveMQMessage(md);
1378                            beforeMessageIsConsumed(md);
1379                            try {
1380                                boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired();
1381                                if (!expired) {
1382                                    listener.onMessage(message);
1383                                }
1384                                afterMessageIsConsumed(md, expired);
1385                            } catch (RuntimeException e) {
1386                                LOG.error("{} Exception while processing message: {}", getConsumerId(), md.getMessage().getMessageId(), e);
1387                                if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
1388                                    // schedual redelivery and possible dlq processing
1389                                    md.setRollbackCause(e);
1390                                    rollback();
1391                                } else {
1392                                    // Transacted or Client ack: Deliver the
1393                                    // next message.
1394                                    afterMessageIsConsumed(md, false);
1395                                }
1396                            }
1397                        } else {
1398                            if (!unconsumedMessages.isRunning()) {
1399                                // delayed redelivery, ensure it can be re delivered
1400                                session.connection.rollbackDuplicate(this, md.getMessage());
1401                            }
1402
1403                            if (md.getMessage() == null) {
1404                                // End of browse or pull request timeout.
1405                                unconsumedMessages.enqueue(md);
1406                            } else {
1407                                if (!consumeExpiredMessage(md)) {
1408                                    unconsumedMessages.enqueue(md);
1409                                    if (availableListener != null) {
1410                                        availableListener.onMessageAvailable(this);
1411                                    }
1412                                } else {
1413                                    beforeMessageIsConsumed(md);
1414                                    afterMessageIsConsumed(md, true);
1415
1416                                    // Pull consumer needs to check if pull timed out and send
1417                                    // a new pull command if not.
1418                                    if (info.getCurrentPrefetchSize() == 0) {
1419                                        unconsumedMessages.enqueue(null);
1420                                    }
1421                                }
1422                            }
1423                        }
1424                    } else {
1425                        // deal with duplicate delivery
1426                        ConsumerId consumerWithPendingTransaction;
1427                        if (redeliveryExpectedInCurrentTransaction(md, true)) {
1428                            LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage());
1429                            if (transactedIndividualAck) {
1430                                immediateIndividualTransactedAck(md);
1431                            } else {
1432                                session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
1433                            }
1434                        } else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) {
1435                            LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(), md.getMessage(), consumerWithPendingTransaction);
1436                            session.getConnection().rollbackDuplicate(this, md.getMessage());
1437                            dispatch(md);
1438                        } else {
1439                            LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
1440                            posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
1441                        }
1442                    }
1443                }
1444            }
1445            if (++dispatchedCount % 1000 == 0) {
1446                dispatchedCount = 0;
1447                Thread.yield();
1448            }
1449        } catch (Exception e) {
1450            session.connection.onClientInternalException(e);
1451        }
1452    }
1453
1454    private boolean redeliveryExpectedInCurrentTransaction(MessageDispatch md, boolean markReceipt) {
1455        if (session.isTransacted()) {
1456            synchronized (deliveredMessages) {
1457                if (previouslyDeliveredMessages != null) {
1458                    if (previouslyDeliveredMessages.containsKey(md.getMessage().getMessageId())) {
1459                        if (markReceipt) {
1460                            previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
1461                        }
1462                        return true;
1463                    }
1464                }
1465            }
1466        }
1467        return false;
1468    }
1469
1470    private ConsumerId redeliveryPendingInCompetingTransaction(MessageDispatch md) {
1471        for (ActiveMQSession activeMQSession: session.connection.getSessions()) {
1472            for (ActiveMQMessageConsumer activeMQMessageConsumer : activeMQSession.consumers) {
1473                if (activeMQMessageConsumer.redeliveryExpectedInCurrentTransaction(md, false)) {
1474                    return activeMQMessageConsumer.getConsumerId();
1475                }
1476            }
1477        }
1478        return null;
1479    }
1480
1481    // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
1482    private void clearDeliveredList() {
1483        if (clearDeliveredList) {
1484            synchronized (deliveredMessages) {
1485                if (clearDeliveredList) {
1486                    if (!deliveredMessages.isEmpty()) {
1487                        if (session.isTransacted()) {
1488
1489                            if (previouslyDeliveredMessages == null) {
1490                                previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
1491                            }
1492                            for (MessageDispatch delivered : deliveredMessages) {
1493                                previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
1494                            }
1495                            LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt",
1496                                      getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size());
1497                        } else {
1498                            if (session.isClientAcknowledge()) {
1499                                LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
1500                                // allow redelivery
1501                                if (!this.info.isBrowser()) {
1502                                    for (MessageDispatch md: deliveredMessages) {
1503                                        this.session.connection.rollbackDuplicate(this, md.getMessage());
1504                                    }
1505                                }
1506                            }
1507                            LOG.debug("{} clearing delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
1508                            deliveredMessages.clear();
1509                            pendingAck = null;
1510                        }
1511                    }
1512                    clearDeliveredList = false;
1513                }
1514            }
1515        }
1516    }
1517
1518    public int getMessageSize() {
1519        return unconsumedMessages.size();
1520    }
1521
1522    public void start() throws JMSException {
1523        if (unconsumedMessages.isClosed()) {
1524            return;
1525        }
1526        started.set(true);
1527        unconsumedMessages.start();
1528        session.executor.wakeup();
1529    }
1530
1531    public void stop() {
1532        started.set(false);
1533        unconsumedMessages.stop();
1534    }
1535
1536    @Override
1537    public String toString() {
1538        return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
1539               + " }";
1540    }
1541
1542    /**
1543     * Delivers a message to the message listener.
1544     *
1545     * @return
1546     * @throws JMSException
1547     */
1548    public boolean iterate() {
1549        MessageListener listener = this.messageListener.get();
1550        if (listener != null) {
1551            MessageDispatch md = unconsumedMessages.dequeueNoWait();
1552            if (md != null) {
1553                dispatch(md);
1554                return true;
1555            }
1556        }
1557        return false;
1558    }
1559
1560    public boolean isInUse(ActiveMQTempDestination destination) {
1561        return info.getDestination().equals(destination);
1562    }
1563
1564    public long getLastDeliveredSequenceId() {
1565        return lastDeliveredSequenceId;
1566    }
1567
1568    public IOException getFailureError() {
1569        return failureError;
1570    }
1571
1572    public void setFailureError(IOException failureError) {
1573        this.failureError = failureError;
1574    }
1575
1576    /**
1577     * @return the optimizedAckScheduledAckInterval
1578     */
1579    public long getOptimizedAckScheduledAckInterval() {
1580        return optimizedAckScheduledAckInterval;
1581    }
1582
1583    /**
1584     * @param optimizedAckScheduledAckInterval the optimizedAckScheduledAckInterval to set
1585     */
1586    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) throws JMSException {
1587        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
1588
1589        if (this.optimizedAckTask != null) {
1590            try {
1591                this.session.connection.getScheduler().cancel(optimizedAckTask);
1592            } catch (JMSException e) {
1593                LOG.debug("Caught exception while cancelling old optimized ack task", e);
1594                throw e;
1595            }
1596            this.optimizedAckTask = null;
1597        }
1598
1599        // Should we periodically send out all outstanding acks.
1600        if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0) {
1601            this.optimizedAckTask = new Runnable() {
1602
1603                @Override
1604                public void run() {
1605                    try {
1606                        if (optimizeAcknowledge && !unconsumedMessages.isClosed()) {
1607                            LOG.info("Consumer:{} is performing scheduled delivery of outstanding optimized Acks", info.getConsumerId());
1608                            deliverAcks();
1609                        }
1610                    } catch (Exception e) {
1611                        LOG.debug("Optimized Ack Task caught exception during ack", e);
1612                    }
1613                }
1614            };
1615
1616            try {
1617                this.session.connection.getScheduler().executePeriodically(optimizedAckTask, optimizedAckScheduledAckInterval);
1618            } catch (JMSException e) {
1619                LOG.debug("Caught exception while scheduling new optimized ack task", e);
1620                throw e;
1621            }
1622        }
1623    }
1624
1625    public boolean hasMessageListener() {
1626        return messageListener.get() != null;
1627    }
1628
1629    public boolean isConsumerExpiryCheckEnabled() {
1630        return consumerExpiryCheckEnabled;
1631    }
1632
1633    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
1634        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
1635    }
1636}