001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.Iterator;
024import java.util.LinkedList;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.atomic.AtomicReference;
033
034import javax.jms.IllegalStateException;
035import javax.jms.InvalidDestinationException;
036import javax.jms.JMSException;
037import javax.jms.Message;
038import javax.jms.MessageConsumer;
039import javax.jms.MessageListener;
040import javax.jms.TransactionRolledBackException;
041
042import org.apache.activemq.blob.BlobDownloader;
043import org.apache.activemq.command.*;
044import org.apache.activemq.management.JMSConsumerStatsImpl;
045import org.apache.activemq.management.StatsCapable;
046import org.apache.activemq.management.StatsImpl;
047import org.apache.activemq.selector.SelectorParser;
048import org.apache.activemq.transaction.Synchronization;
049import org.apache.activemq.util.Callback;
050import org.apache.activemq.util.IntrospectionSupport;
051import org.apache.activemq.util.JMSExceptionSupport;
052import org.apache.activemq.util.ThreadPoolUtils;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
058 * from a destination. A <CODE> MessageConsumer</CODE> object is created by
059 * passing a <CODE>Destination</CODE> object to a message-consumer creation
060 * method supplied by a session.
061 * <P>
062 * <CODE>MessageConsumer</CODE> is the parent interface for all message
063 * consumers.
064 * <P>
065 * A message consumer can be created with a message selector. A message selector
066 * allows the client to restrict the messages delivered to the message consumer
067 * to those that match the selector.
068 * <P>
069 * A client may either synchronously receive a message consumer's messages or
070 * have the consumer asynchronously deliver them as they arrive.
071 * <P>
072 * For synchronous receipt, a client can request the next message from a message
073 * consumer using one of its <CODE> receive</CODE> methods. There are several
074 * variations of <CODE>receive</CODE> that allow a client to poll or wait for
075 * the next message.
076 * <P>
077 * For asynchronous delivery, a client can register a
078 * <CODE>MessageListener</CODE> object with a message consumer. As messages
079 * arrive at the message consumer, it delivers them by calling the
080 * <CODE>MessageListener</CODE>'s<CODE>
081 * onMessage</CODE> method.
082 * <P>
083 * It is a client programming error for a <CODE>MessageListener</CODE> to
084 * throw an exception.
085 *
086 *
087 * @see javax.jms.MessageConsumer
088 * @see javax.jms.QueueReceiver
089 * @see javax.jms.TopicSubscriber
090 * @see javax.jms.Session
091 */
092public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
093
094    @SuppressWarnings("serial")
095    class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> {
096        final TransactionId transactionId;
097        public PreviouslyDeliveredMap(TransactionId transactionId) {
098            this.transactionId = transactionId;
099        }
100    }
101    class PreviouslyDelivered {
102        org.apache.activemq.command.Message message;
103        boolean redelivered;
104
105        PreviouslyDelivered(MessageDispatch messageDispatch) {
106            message = messageDispatch.getMessage();
107        }
108
109        PreviouslyDelivered(MessageDispatch messageDispatch, boolean redelivered) {
110            message = messageDispatch.getMessage();
111            this.redelivered = redelivered;
112        }
113    }
114
115    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
116    protected final ActiveMQSession session;
117    protected final ConsumerInfo info;
118
119    // These are the messages waiting to be delivered to the client
120    protected final MessageDispatchChannel unconsumedMessages;
121
122    // The are the messages that were delivered to the consumer but that have
123    // not been acknowledged. It's kept in reverse order since we
124    // Always walk list in reverse order.
125    protected final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
126    // track duplicate deliveries in a transaction such that the tx integrity can be validated
127    private PreviouslyDeliveredMap<MessageId, PreviouslyDelivered> previouslyDeliveredMessages;
128    private int deliveredCounter;
129    private int additionalWindowSize;
130    private long redeliveryDelay;
131    private int ackCounter;
132    private int dispatchedCount;
133    private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
134    private final JMSConsumerStatsImpl stats;
135
136    private final String selector;
137    private boolean synchronizationRegistered;
138    private final AtomicBoolean started = new AtomicBoolean(false);
139
140    private MessageAvailableListener availableListener;
141
142    private RedeliveryPolicy redeliveryPolicy;
143    private boolean optimizeAcknowledge;
144    private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
145    private ExecutorService executorService;
146    private MessageTransformer transformer;
147    private volatile boolean clearDeliveredList;
148    AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
149
150    private MessageAck pendingAck;
151    private long lastDeliveredSequenceId = -1;
152
153    private IOException failureError;
154
155    private long optimizeAckTimestamp = System.currentTimeMillis();
156    private long optimizeAcknowledgeTimeOut = 0;
157    private long optimizedAckScheduledAckInterval = 0;
158    private Runnable optimizedAckTask;
159    private long failoverRedeliveryWaitPeriod = 0;
160    private boolean transactedIndividualAck = false;
161    private boolean nonBlockingRedelivery = false;
162    private boolean consumerExpiryCheckEnabled = true;
163
164    /**
165     * Create a MessageConsumer
166     *
167     * @param session
168     * @param dest
169     * @param name
170     * @param selector
171     * @param prefetch
172     * @param maximumPendingMessageCount
173     * @param noLocal
174     * @param browser
175     * @param dispatchAsync
176     * @param messageListener
177     * @throws JMSException
178     */
179    public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
180            String name, String selector, int prefetch,
181            int maximumPendingMessageCount, boolean noLocal, boolean browser,
182            boolean dispatchAsync, MessageListener messageListener) throws JMSException {
183        if (dest == null) {
184            throw new InvalidDestinationException("Don't understand null destinations");
185        } else if (dest.getPhysicalName() == null) {
186            throw new InvalidDestinationException("The destination object was not given a physical name.");
187        } else if (dest.isTemporary()) {
188            String physicalName = dest.getPhysicalName();
189
190            if (physicalName == null) {
191                throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
192            }
193
194            String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
195
196            if (physicalName.indexOf(connectionID) < 0) {
197                throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
198            }
199
200            if (session.connection.isDeleted(dest)) {
201                throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
202            }
203            if (prefetch < 0) {
204                throw new JMSException("Cannot have a prefetch size less than zero");
205            }
206        }
207        if (session.connection.isMessagePrioritySupported()) {
208            this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
209        }else {
210            this.unconsumedMessages = new FifoMessageDispatchChannel();
211        }
212
213        this.session = session;
214        this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
215        if (this.redeliveryPolicy == null) {
216            this.redeliveryPolicy = new RedeliveryPolicy();
217        }
218        setTransformer(session.getTransformer());
219
220        this.info = new ConsumerInfo(consumerId);
221        this.info.setExclusive(this.session.connection.isExclusiveConsumer());
222        this.info.setClientId(this.session.connection.getClientID());
223        this.info.setSubscriptionName(name);
224        this.info.setPrefetchSize(prefetch);
225        this.info.setCurrentPrefetchSize(prefetch);
226        this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
227        this.info.setNoLocal(noLocal);
228        this.info.setDispatchAsync(dispatchAsync);
229        this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
230        this.info.setSelector(null);
231
232        // Allows the options on the destination to configure the consumerInfo
233        if (dest.getOptions() != null) {
234            Map<String, Object> options = IntrospectionSupport.extractProperties(
235                new HashMap<String, Object>(dest.getOptions()), "consumer.");
236            IntrospectionSupport.setProperties(this.info, options);
237            if (options.size() > 0) {
238                String msg = "There are " + options.size()
239                    + " consumer options that couldn't be set on the consumer."
240                    + " Check the options are spelled correctly."
241                    + " Unknown parameters=[" + options + "]."
242                    + " This consumer cannot be started.";
243                LOG.warn(msg);
244                throw new ConfigurationException(msg);
245            }
246        }
247
248        this.info.setDestination(dest);
249        this.info.setBrowser(browser);
250        if (selector != null && selector.trim().length() != 0) {
251            // Validate the selector
252            SelectorParser.parse(selector);
253            this.info.setSelector(selector);
254            this.selector = selector;
255        } else if (info.getSelector() != null) {
256            // Validate the selector
257            SelectorParser.parse(this.info.getSelector());
258            this.selector = this.info.getSelector();
259        } else {
260            this.selector = null;
261        }
262
263        this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
264        this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
265                                   && !info.isBrowser();
266        if (this.optimizeAcknowledge) {
267            this.optimizeAcknowledgeTimeOut = session.connection.getOptimizeAcknowledgeTimeOut();
268            setOptimizedAckScheduledAckInterval(session.connection.getOptimizedAckScheduledAckInterval());
269        }
270
271        this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
272        this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
273        this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
274        this.transactedIndividualAck = session.connection.isTransactedIndividualAck()
275                        || this.nonBlockingRedelivery
276                        || session.connection.isMessagePrioritySupported();
277        this.consumerExpiryCheckEnabled = session.connection.isConsumerExpiryCheckEnabled();
278        if (messageListener != null) {
279            setMessageListener(messageListener);
280        }
281        try {
282            this.session.addConsumer(this);
283            this.session.syncSendPacket(info);
284        } catch (JMSException e) {
285            this.session.removeConsumer(this);
286            throw e;
287        }
288
289        if (session.connection.isStarted()) {
290            start();
291        }
292    }
293
294    private boolean isAutoAcknowledgeEach() {
295        return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() );
296    }
297
298    private boolean isAutoAcknowledgeBatch() {
299        return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
300    }
301
302    @Override
303    public StatsImpl getStats() {
304        return stats;
305    }
306
307    public JMSConsumerStatsImpl getConsumerStats() {
308        return stats;
309    }
310
311    public RedeliveryPolicy getRedeliveryPolicy() {
312        return redeliveryPolicy;
313    }
314
315    /**
316     * Sets the redelivery policy used when messages are redelivered
317     */
318    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
319        this.redeliveryPolicy = redeliveryPolicy;
320    }
321
322    public MessageTransformer getTransformer() {
323        return transformer;
324    }
325
326    /**
327     * Sets the transformer used to transform messages before they are sent on
328     * to the JMS bus
329     */
330    public void setTransformer(MessageTransformer transformer) {
331        this.transformer = transformer;
332    }
333
334    /**
335     * @return Returns the value.
336     */
337    public ConsumerId getConsumerId() {
338        return info.getConsumerId();
339    }
340
341    /**
342     * @return the consumer name - used for durable consumers
343     */
344    public String getConsumerName() {
345        return this.info.getSubscriptionName();
346    }
347
348    /**
349     * @return true if this consumer does not accept locally produced messages
350     */
351    protected boolean isNoLocal() {
352        return info.isNoLocal();
353    }
354
355    /**
356     * Retrieve is a browser
357     *
358     * @return true if a browser
359     */
360    protected boolean isBrowser() {
361        return info.isBrowser();
362    }
363
364    /**
365     * @return ActiveMQDestination
366     */
367    protected ActiveMQDestination getDestination() {
368        return info.getDestination();
369    }
370
371    /**
372     * @return Returns the prefetchNumber.
373     */
374    public int getPrefetchNumber() {
375        return info.getPrefetchSize();
376    }
377
378    /**
379     * @return true if this is a durable topic subscriber
380     */
381    public boolean isDurableSubscriber() {
382        return info.getSubscriptionName() != null && info.getDestination().isTopic();
383    }
384
385    /**
386     * Gets this message consumer's message selector expression.
387     *
388     * @return this message consumer's message selector, or null if no message
389     *         selector exists for the message consumer (that is, if the message
390     *         selector was not set or was set to null or the empty string)
391     * @throws JMSException if the JMS provider fails to receive the next
392     *                 message due to some internal error.
393     */
394    @Override
395    public String getMessageSelector() throws JMSException {
396        checkClosed();
397        return selector;
398    }
399
400    /**
401     * Gets the message consumer's <CODE>MessageListener</CODE>.
402     *
403     * @return the listener for the message consumer, or null if no listener is
404     *         set
405     * @throws JMSException if the JMS provider fails to get the message
406     *                 listener due to some internal error.
407     * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
408     */
409    @Override
410    public MessageListener getMessageListener() throws JMSException {
411        checkClosed();
412        return this.messageListener.get();
413    }
414
415    /**
416     * Sets the message consumer's <CODE>MessageListener</CODE>.
417     * <P>
418     * Setting the message listener to null is the equivalent of unsetting the
419     * message listener for the message consumer.
420     * <P>
421     * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
422     * while messages are being consumed by an existing listener or the consumer
423     * is being used to consume messages synchronously is undefined.
424     *
425     * @param listener the listener to which the messages are to be delivered
426     * @throws JMSException if the JMS provider fails to receive the next
427     *                 message due to some internal error.
428     * @see javax.jms.MessageConsumer#getMessageListener
429     */
430    @Override
431    public void setMessageListener(MessageListener listener) throws JMSException {
432        checkClosed();
433        if (info.getPrefetchSize() == 0) {
434            throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
435        }
436        if (listener != null) {
437            boolean wasRunning = session.isRunning();
438            if (wasRunning) {
439                session.stop();
440            }
441
442            this.messageListener.set(listener);
443            session.redispatch(this, unconsumedMessages);
444
445            if (wasRunning) {
446                session.start();
447            }
448        } else {
449            this.messageListener.set(null);
450        }
451    }
452
453    @Override
454    public MessageAvailableListener getAvailableListener() {
455        return availableListener;
456    }
457
458    /**
459     * Sets the listener used to notify synchronous consumers that there is a
460     * message available so that the {@link MessageConsumer#receiveNoWait()} can
461     * be called.
462     */
463    @Override
464    public void setAvailableListener(MessageAvailableListener availableListener) {
465        this.availableListener = availableListener;
466    }
467
468    /**
469     * Used to get an enqueued message from the unconsumedMessages list. The
470     * amount of time this method blocks is based on the timeout value. - if
471     * timeout==-1 then it blocks until a message is received. - if timeout==0
472     * then it it tries to not block at all, it returns a message if it is
473     * available - if timeout>0 then it blocks up to timeout amount of time.
474     * Expired messages will consumed by this method.
475     *
476     * @throws JMSException
477     * @return null if we timeout or if the consumer is closed.
478     */
479    private MessageDispatch dequeue(long timeout) throws JMSException {
480        try {
481            long deadline = 0;
482            if (timeout > 0) {
483                deadline = System.currentTimeMillis() + timeout;
484            }
485            while (true) {
486                MessageDispatch md = unconsumedMessages.dequeue(timeout);
487                if (md == null) {
488                    if (timeout > 0 && !unconsumedMessages.isClosed()) {
489                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
490                    } else {
491                        if (failureError != null) {
492                            throw JMSExceptionSupport.create(failureError);
493                        } else {
494                            return null;
495                        }
496                    }
497                } else if (md.getMessage() == null) {
498                    return null;
499                } else if (consumeExpiredMessage(md)) {
500                    LOG.debug("{} received expired message: {}", getConsumerId(), md);
501                    beforeMessageIsConsumed(md);
502                    afterMessageIsConsumed(md, true);
503                    if (timeout > 0) {
504                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
505                    }
506                    sendPullCommand(timeout);
507                } else if (redeliveryExceeded(md)) {
508                    LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
509                    posionAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
510                    if (timeout > 0) {
511                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
512                    }
513                    sendPullCommand(timeout);
514                } else {
515                    if (LOG.isTraceEnabled()) {
516                        LOG.trace(getConsumerId() + " received message: " + md);
517                    }
518                    return md;
519                }
520            }
521        } catch (InterruptedException e) {
522            Thread.currentThread().interrupt();
523            throw JMSExceptionSupport.create(e);
524        }
525    }
526
527    private boolean consumeExpiredMessage(MessageDispatch dispatch) {
528        return isConsumerExpiryCheckEnabled() && dispatch.getMessage().isExpired();
529    }
530
531    private void posionAck(MessageDispatch md, String cause) throws JMSException {
532        MessageAck posionAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
533        posionAck.setFirstMessageId(md.getMessage().getMessageId());
534        posionAck.setPoisonCause(new Throwable(cause));
535        session.sendAck(posionAck);
536    }
537
538    private boolean redeliveryExceeded(MessageDispatch md) {
539        try {
540            return session.getTransacted()
541                    && redeliveryPolicy != null
542                    && redeliveryPolicy.isPreDispatchCheck()
543                    && redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
544                    && md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()
545                    // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
546                    && md.getMessage().getProperty("redeliveryDelay") == null;
547        } catch (Exception ignored) {
548            return false;
549        }
550    }
551
552    /**
553     * Receives the next message produced for this message consumer.
554     * <P>
555     * This call blocks indefinitely until a message is produced or until this
556     * message consumer is closed.
557     * <P>
558     * If this <CODE>receive</CODE> is done within a transaction, the consumer
559     * retains the message until the transaction commits.
560     *
561     * @return the next message produced for this message consumer, or null if
562     *         this message consumer is concurrently closed
563     */
564    @Override
565    public Message receive() throws JMSException {
566        checkClosed();
567        checkMessageListener();
568
569        sendPullCommand(0);
570        MessageDispatch md = dequeue(-1);
571        if (md == null) {
572            return null;
573        }
574
575        beforeMessageIsConsumed(md);
576        afterMessageIsConsumed(md, false);
577
578        return createActiveMQMessage(md);
579    }
580
581    /**
582     * @param md
583     * @return
584     */
585    private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
586        ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
587        if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
588            ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
589        }
590        if (m.getDataStructureType() == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) {
591            ((ActiveMQObjectMessage)m).setTrustAllPackages(session.getConnection().isTrustAllPackages());
592            ((ActiveMQObjectMessage)m).setTrustedPackages(session.getConnection().getTrustedPackages());
593        }
594        if (transformer != null) {
595            Message transformedMessage = transformer.consumerTransform(session, this, m);
596            if (transformedMessage != null) {
597                m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
598            }
599        }
600        if (session.isClientAcknowledge()) {
601            m.setAcknowledgeCallback(new Callback() {
602                @Override
603                public void execute() throws Exception {
604                    session.checkClosed();
605                    session.acknowledge();
606                }
607            });
608        } else if (session.isIndividualAcknowledge()) {
609            m.setAcknowledgeCallback(new Callback() {
610                @Override
611                public void execute() throws Exception {
612                    session.checkClosed();
613                    acknowledge(md);
614                }
615            });
616        }
617        return m;
618    }
619
620    /**
621     * Receives the next message that arrives within the specified timeout
622     * interval.
623     * <P>
624     * This call blocks until a message arrives, the timeout expires, or this
625     * message consumer is closed. A <CODE>timeout</CODE> of zero never
626     * expires, and the call blocks indefinitely.
627     *
628     * @param timeout the timeout value (in milliseconds), a time out of zero
629     *                never expires.
630     * @return the next message produced for this message consumer, or null if
631     *         the timeout expires or this message consumer is concurrently
632     *         closed
633     */
634    @Override
635    public Message receive(long timeout) throws JMSException {
636        checkClosed();
637        checkMessageListener();
638        if (timeout == 0) {
639            return this.receive();
640        }
641
642        sendPullCommand(timeout);
643        while (timeout > 0) {
644
645            MessageDispatch md;
646            if (info.getPrefetchSize() == 0) {
647                md = dequeue(-1); // We let the broker let us know when we timeout.
648            } else {
649                md = dequeue(timeout);
650            }
651
652            if (md == null) {
653                return null;
654            }
655
656            beforeMessageIsConsumed(md);
657            afterMessageIsConsumed(md, false);
658            return createActiveMQMessage(md);
659        }
660        return null;
661    }
662
663    /**
664     * Receives the next message if one is immediately available.
665     *
666     * @return the next message produced for this message consumer, or null if
667     *         one is not available
668     * @throws JMSException if the JMS provider fails to receive the next
669     *                 message due to some internal error.
670     */
671    @Override
672    public Message receiveNoWait() throws JMSException {
673        checkClosed();
674        checkMessageListener();
675        sendPullCommand(-1);
676
677        MessageDispatch md;
678        if (info.getPrefetchSize() == 0) {
679            md = dequeue(-1); // We let the broker let us know when we
680            // timeout.
681        } else {
682            md = dequeue(0);
683        }
684
685        if (md == null) {
686            return null;
687        }
688
689        beforeMessageIsConsumed(md);
690        afterMessageIsConsumed(md, false);
691        return createActiveMQMessage(md);
692    }
693
694    /**
695     * Closes the message consumer.
696     * <P>
697     * Since a provider may allocate some resources on behalf of a <CODE>
698     * MessageConsumer</CODE>
699     * outside the Java virtual machine, clients should close them when they are
700     * not needed. Relying on garbage collection to eventually reclaim these
701     * resources may not be timely enough.
702     * <P>
703     * This call blocks until a <CODE>receive</CODE> or message listener in
704     * progress has completed. A blocked message consumer <CODE>receive </CODE>
705     * call returns null when this message consumer is closed.
706     *
707     * @throws JMSException if the JMS provider fails to close the consumer due
708     *                 to some internal error.
709     */
710    @Override
711    public void close() throws JMSException {
712        if (!unconsumedMessages.isClosed()) {
713            if (!deliveredMessages.isEmpty() && session.getTransactionContext().isInTransaction()) {
714                session.getTransactionContext().addSynchronization(new Synchronization() {
715                    @Override
716                    public void afterCommit() throws Exception {
717                        doClose();
718                    }
719
720                    @Override
721                    public void afterRollback() throws Exception {
722                        doClose();
723                    }
724                });
725            } else {
726                doClose();
727            }
728        }
729    }
730
731    void doClose() throws JMSException {
732        dispose();
733        RemoveInfo removeCommand = info.createRemoveCommand();
734        LOG.debug("remove: {}, lastDeliveredSequenceId: {}", getConsumerId(), lastDeliveredSequenceId);
735        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
736        this.session.asyncSendPacket(removeCommand);
737    }
738
739    void inProgressClearRequired() {
740        inProgressClearRequiredFlag.incrementAndGet();
741        // deal with delivered messages async to avoid lock contention with in progress acks
742        clearDeliveredList = true;
743        // force a rollback if we will be acking in a transaction after/during failover
744        // bc acks are async they may not get there reliably on reconnect and the consumer
745        // may not be aware of the reconnect in a timely fashion if in onMessage
746        if (!deliveredMessages.isEmpty() && session.getTransactionContext().isInTransaction()) {
747            session.getTransactionContext().setRollbackOnly(true);
748        }
749    }
750
751    void clearMessagesInProgress() {
752        if (inProgressClearRequiredFlag.get() > 0) {
753            synchronized (unconsumedMessages.getMutex()) {
754                if (inProgressClearRequiredFlag.get() > 0) {
755                    LOG.debug("{} clearing unconsumed list ({}) on transport interrupt", getConsumerId(), unconsumedMessages.size());
756                    // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
757                    List<MessageDispatch> list = unconsumedMessages.removeAll();
758                    if (!this.info.isBrowser()) {
759                        for (MessageDispatch old : list) {
760                            session.connection.rollbackDuplicate(this, old.getMessage());
761                        }
762                    }
763                    // allow dispatch on this connection to resume
764                    session.connection.transportInterruptionProcessingComplete();
765                    inProgressClearRequiredFlag.set(0);
766
767                    // Wake up any blockers and allow them to recheck state.
768                    unconsumedMessages.getMutex().notifyAll();
769                }
770            }
771            clearDeliveredList();
772        }
773    }
774
775    void deliverAcks() {
776        MessageAck ack = null;
777        if (deliveryingAcknowledgements.compareAndSet(false, true)) {
778            if (isAutoAcknowledgeEach()) {
779                synchronized(deliveredMessages) {
780                    ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
781                    if (ack != null) {
782                        deliveredMessages.clear();
783                        ackCounter = 0;
784                    } else {
785                        ack = pendingAck;
786                        pendingAck = null;
787                    }
788                }
789            } else if (pendingAck != null && pendingAck.isStandardAck()) {
790                ack = pendingAck;
791                pendingAck = null;
792            }
793            if (ack != null) {
794                final MessageAck ackToSend = ack;
795
796                if (executorService == null) {
797                    executorService = Executors.newSingleThreadExecutor();
798                }
799                executorService.submit(new Runnable() {
800                    @Override
801                    public void run() {
802                        try {
803                            session.sendAck(ackToSend,true);
804                        } catch (JMSException e) {
805                            LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
806                        } finally {
807                            deliveryingAcknowledgements.set(false);
808                        }
809                    }
810                });
811            } else {
812                deliveryingAcknowledgements.set(false);
813            }
814        }
815    }
816
817    public void dispose() throws JMSException {
818        if (!unconsumedMessages.isClosed()) {
819
820            // Do we have any acks we need to send out before closing?
821            // Ack any delivered messages now.
822            if (!session.getTransacted()) {
823                deliverAcks();
824                if (isAutoAcknowledgeBatch()) {
825                    acknowledge();
826                }
827            }
828            if (executorService != null) {
829                ThreadPoolUtils.shutdownGraceful(executorService, 60000L);
830                executorService = null;
831            }
832            if (optimizedAckTask != null) {
833                this.session.connection.getScheduler().cancel(optimizedAckTask);
834                optimizedAckTask = null;
835            }
836
837            if (session.isClientAcknowledge()) {
838                if (!this.info.isBrowser()) {
839                    // rollback duplicates that aren't acknowledged
840                    List<MessageDispatch> tmp = null;
841                    synchronized (this.deliveredMessages) {
842                        tmp = new ArrayList<MessageDispatch>(this.deliveredMessages);
843                    }
844                    for (MessageDispatch old : tmp) {
845                        this.session.connection.rollbackDuplicate(this, old.getMessage());
846                    }
847                    tmp.clear();
848                }
849            }
850            if (!session.isTransacted()) {
851                synchronized(deliveredMessages) {
852                    deliveredMessages.clear();
853                }
854            }
855            unconsumedMessages.close();
856            this.session.removeConsumer(this);
857            List<MessageDispatch> list = unconsumedMessages.removeAll();
858            if (!this.info.isBrowser()) {
859                for (MessageDispatch old : list) {
860                    // ensure we don't filter this as a duplicate
861                    if (old.getMessage() != null) {
862                        LOG.debug("on close, rollback duplicate: {}", old.getMessage().getMessageId());
863                    }
864                    session.connection.rollbackDuplicate(this, old.getMessage());
865                }
866            }
867        }
868        if (previouslyDeliveredMessages != null) {
869            for (PreviouslyDelivered previouslyDelivered : previouslyDeliveredMessages.values()) {
870                session.connection.rollbackDuplicate(this, previouslyDelivered.message);
871            }
872        }
873        clearPreviouslyDelivered();
874    }
875
876    /**
877     * @throws IllegalStateException
878     */
879    protected void checkClosed() throws IllegalStateException {
880        if (unconsumedMessages.isClosed()) {
881            throw new IllegalStateException("The Consumer is closed");
882        }
883    }
884
885    /**
886     * If we have a zero prefetch specified then send a pull command to the
887     * broker to pull a message we are about to receive
888     */
889    protected void sendPullCommand(long timeout) throws JMSException {
890        clearDeliveredList();
891        if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
892            MessagePull messagePull = new MessagePull();
893            messagePull.configure(info);
894            messagePull.setTimeout(timeout);
895            session.asyncSendPacket(messagePull);
896        }
897    }
898
899    protected void checkMessageListener() throws JMSException {
900        session.checkMessageListener();
901    }
902
903    protected void setOptimizeAcknowledge(boolean value) {
904        if (optimizeAcknowledge && !value) {
905            deliverAcks();
906        }
907        optimizeAcknowledge = value;
908    }
909
910    protected void setPrefetchSize(int prefetch) {
911        deliverAcks();
912        this.info.setCurrentPrefetchSize(prefetch);
913    }
914
915    private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
916        md.setDeliverySequenceId(session.getNextDeliveryId());
917        lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
918        if (!isAutoAcknowledgeBatch()) {
919            synchronized(deliveredMessages) {
920                deliveredMessages.addFirst(md);
921            }
922            if (session.getTransacted()) {
923                if (transactedIndividualAck) {
924                    immediateIndividualTransactedAck(md);
925                } else {
926                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
927                }
928            }
929        }
930    }
931
932    private void immediateIndividualTransactedAck(MessageDispatch md) throws JMSException {
933        // acks accumulate on the broker pending transaction completion to indicate
934        // delivery status
935        registerSync();
936        MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
937        ack.setTransactionId(session.getTransactionContext().getTransactionId());
938        session.sendAck(ack);
939    }
940
941    private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
942        if (unconsumedMessages.isClosed()) {
943            return;
944        }
945        if (messageExpired) {
946            acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);
947            stats.getExpiredMessageCount().increment();
948        } else {
949            stats.onMessage();
950            if (session.getTransacted()) {
951                // Do nothing.
952            } else if (isAutoAcknowledgeEach()) {
953                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
954                    synchronized (deliveredMessages) {
955                        if (!deliveredMessages.isEmpty()) {
956                            if (optimizeAcknowledge) {
957                                ackCounter++;
958
959                                // AMQ-3956 evaluate both expired and normal msgs as
960                                // otherwise consumer may get stalled
961                                if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
962                                    MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
963                                    if (ack != null) {
964                                        deliveredMessages.clear();
965                                        ackCounter = 0;
966                                        session.sendAck(ack);
967                                        optimizeAckTimestamp = System.currentTimeMillis();
968                                    }
969                                    // AMQ-3956 - as further optimization send
970                                    // ack for expired msgs when there are any.
971                                    // This resets the deliveredCounter to 0 so that
972                                    // we won't sent standard acks with every msg just
973                                    // because the deliveredCounter just below
974                                    // 0.5 * prefetch as used in ackLater()
975                                    if (pendingAck != null && deliveredCounter > 0) {
976                                        session.sendAck(pendingAck);
977                                        pendingAck = null;
978                                        deliveredCounter = 0;
979                                    }
980                                }
981                            } else {
982                                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
983                                if (ack!=null) {
984                                    deliveredMessages.clear();
985                                    session.sendAck(ack);
986                                }
987                            }
988                        }
989                    }
990                    deliveryingAcknowledgements.set(false);
991                }
992            } else if (isAutoAcknowledgeBatch()) {
993                ackLater(md, MessageAck.STANDARD_ACK_TYPE);
994            } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
995                boolean messageUnackedByConsumer = false;
996                synchronized (deliveredMessages) {
997                    messageUnackedByConsumer = deliveredMessages.contains(md);
998                }
999                if (messageUnackedByConsumer) {
1000                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
1001                }
1002            }
1003            else {
1004                throw new IllegalStateException("Invalid session state.");
1005            }
1006        }
1007    }
1008
1009    /**
1010     * Creates a MessageAck for all messages contained in deliveredMessages.
1011     * Caller should hold the lock for deliveredMessages.
1012     *
1013     * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE)
1014     * @return <code>null</code> if nothing to ack.
1015     */
1016    private MessageAck makeAckForAllDeliveredMessages(byte type) {
1017        synchronized (deliveredMessages) {
1018            if (deliveredMessages.isEmpty()) {
1019                return null;
1020            }
1021
1022            MessageDispatch md = deliveredMessages.getFirst();
1023            MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
1024            ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
1025            return ack;
1026        }
1027    }
1028
1029    private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
1030
1031        // Don't acknowledge now, but we may need to let the broker know the
1032        // consumer got the message to expand the pre-fetch window
1033        if (session.getTransacted()) {
1034            registerSync();
1035        }
1036
1037        deliveredCounter++;
1038
1039        MessageAck oldPendingAck = pendingAck;
1040        pendingAck = new MessageAck(md, ackType, deliveredCounter);
1041        pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
1042        if( oldPendingAck==null ) {
1043            pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
1044        } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
1045            pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
1046        } else {
1047            // old pending ack being superseded by ack of another type, if is is not a delivered
1048            // ack and hence important, send it now so it is not lost.
1049            if (!oldPendingAck.isDeliveredAck()) {
1050                LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, pendingAck);
1051                session.sendAck(oldPendingAck);
1052            } else {
1053                LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, pendingAck);
1054            }
1055        }
1056        // AMQ-3956 evaluate both expired and normal msgs as
1057        // otherwise consumer may get stalled
1058        if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) {
1059            LOG.debug("ackLater: sending: {}", pendingAck);
1060            session.sendAck(pendingAck);
1061            pendingAck=null;
1062            deliveredCounter = 0;
1063            additionalWindowSize = 0;
1064        }
1065    }
1066
1067    private void registerSync() throws JMSException {
1068        session.doStartTransaction();
1069        if (!synchronizationRegistered) {
1070            synchronizationRegistered = true;
1071            session.getTransactionContext().addSynchronization(new Synchronization() {
1072                @Override
1073                public void beforeEnd() throws Exception {
1074                    if (transactedIndividualAck) {
1075                        clearDeliveredList();
1076                        waitForRedeliveries();
1077                        synchronized(deliveredMessages) {
1078                            rollbackOnFailedRecoveryRedelivery();
1079                        }
1080                    } else {
1081                        acknowledge();
1082                    }
1083                    synchronizationRegistered = false;
1084                }
1085
1086                @Override
1087                public void afterCommit() throws Exception {
1088                    commit();
1089                    synchronizationRegistered = false;
1090                }
1091
1092                @Override
1093                public void afterRollback() throws Exception {
1094                    rollback();
1095                    synchronizationRegistered = false;
1096                }
1097            });
1098        }
1099    }
1100
1101    /**
1102     * Acknowledge all the messages that have been delivered to the client up to
1103     * this point.
1104     *
1105     * @throws JMSException
1106     */
1107    public void acknowledge() throws JMSException {
1108        clearDeliveredList();
1109        waitForRedeliveries();
1110        synchronized(deliveredMessages) {
1111            // Acknowledge all messages so far.
1112            MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
1113            if (ack == null) {
1114                return; // no msgs
1115            }
1116
1117            if (session.getTransacted()) {
1118                rollbackOnFailedRecoveryRedelivery();
1119                session.doStartTransaction();
1120                ack.setTransactionId(session.getTransactionContext().getTransactionId());
1121            }
1122
1123            pendingAck = null;
1124            session.sendAck(ack);
1125
1126            // Adjust the counters
1127            deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
1128            additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1129
1130            if (!session.getTransacted()) {
1131                deliveredMessages.clear();
1132            }
1133        }
1134    }
1135
1136    private void waitForRedeliveries() {
1137        if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) {
1138            long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod;
1139            int numberNotReplayed;
1140            do {
1141                numberNotReplayed = 0;
1142                synchronized(deliveredMessages) {
1143                    if (previouslyDeliveredMessages != null) {
1144                        for (PreviouslyDelivered entry: previouslyDeliveredMessages.values()) {
1145                            if (!entry.redelivered) {
1146                                numberNotReplayed++;
1147                            }
1148                        }
1149                    }
1150                }
1151                if (numberNotReplayed > 0) {
1152                    LOG.info("waiting for redelivery of {} in transaction: {}, to consumer: {}",
1153                             numberNotReplayed, this.getConsumerId(), previouslyDeliveredMessages.transactionId);
1154                    try {
1155                        Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
1156                    } catch (InterruptedException outOfhere) {
1157                        break;
1158                    }
1159                }
1160            } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
1161        }
1162    }
1163
1164    /*
1165     * called with deliveredMessages locked
1166     */
1167    private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
1168        if (previouslyDeliveredMessages != null) {
1169            // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback
1170            // as messages have been dispatched else where.
1171            int numberNotReplayed = 0;
1172            for (PreviouslyDelivered entry: previouslyDeliveredMessages.values()) {
1173                if (!entry.redelivered) {
1174                    numberNotReplayed++;
1175                    LOG.debug("previously delivered message has not been replayed in transaction: {}, messageId: {}",
1176                              previouslyDeliveredMessages.transactionId, entry.message.getMessageId());
1177                }
1178            }
1179            if (numberNotReplayed > 0) {
1180                String message = "rolling back transaction ("
1181                    + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed
1182                    + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
1183                LOG.warn(message);
1184                throw new TransactionRolledBackException(message);
1185            }
1186        }
1187    }
1188
1189    void acknowledge(MessageDispatch md) throws JMSException {
1190        acknowledge(md, MessageAck.INDIVIDUAL_ACK_TYPE);
1191    }
1192
1193    void acknowledge(MessageDispatch md, byte ackType) throws JMSException {
1194        MessageAck ack = new MessageAck(md, ackType, 1);
1195        if (ack.isExpiredAck()) {
1196            ack.setFirstMessageId(ack.getLastMessageId());
1197        }
1198        session.sendAck(ack);
1199        synchronized(deliveredMessages){
1200            deliveredMessages.remove(md);
1201        }
1202    }
1203
1204    public void commit() throws JMSException {
1205        synchronized (deliveredMessages) {
1206            deliveredMessages.clear();
1207            clearPreviouslyDelivered();
1208        }
1209        redeliveryDelay = 0;
1210    }
1211
1212    public void rollback() throws JMSException {
1213        clearDeliveredList();
1214        synchronized (unconsumedMessages.getMutex()) {
1215            if (optimizeAcknowledge) {
1216                // remove messages read but not acked at the broker yet through
1217                // optimizeAcknowledge
1218                if (!this.info.isBrowser()) {
1219                    synchronized(deliveredMessages) {
1220                        for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
1221                            // ensure we don't filter this as a duplicate
1222                            MessageDispatch md = deliveredMessages.removeLast();
1223                            session.connection.rollbackDuplicate(this, md.getMessage());
1224                        }
1225                    }
1226                }
1227            }
1228            synchronized(deliveredMessages) {
1229                rollbackPreviouslyDeliveredAndNotRedelivered();
1230                if (deliveredMessages.isEmpty()) {
1231                    return;
1232                }
1233
1234                // use initial delay for first redelivery
1235                MessageDispatch lastMd = deliveredMessages.getFirst();
1236                final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
1237                if (currentRedeliveryCount > 0) {
1238                    redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
1239                } else {
1240                    redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
1241                }
1242                MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
1243
1244                for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1245                    MessageDispatch md = iter.next();
1246                    md.getMessage().onMessageRolledBack();
1247                    // ensure we don't filter this as a duplicate
1248                    session.connection.rollbackDuplicate(this, md.getMessage());
1249                }
1250
1251                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
1252                    && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
1253                    // We need to NACK the messages so that they get sent to the
1254                    // DLQ.
1255                    // Acknowledge the last message.
1256
1257                    MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
1258                    ack.setFirstMessageId(firstMsgId);
1259                    ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter()  + "] exceeds redelivery policy limit:" + redeliveryPolicy
1260                            + ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
1261                    session.sendAck(ack,true);
1262                    // Adjust the window size.
1263                    additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1264                    redeliveryDelay = 0;
1265
1266                    deliveredCounter -= deliveredMessages.size();
1267                    deliveredMessages.clear();
1268
1269                } else {
1270
1271                    // only redelivery_ack after first delivery
1272                    if (currentRedeliveryCount > 0) {
1273                        MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
1274                        ack.setFirstMessageId(firstMsgId);
1275                        session.sendAck(ack,true);
1276                    }
1277
1278                    final LinkedList<MessageDispatch> pendingSessionRedelivery =
1279                            new LinkedList<MessageDispatch>(deliveredMessages);
1280
1281                    captureDeliveredMessagesForDuplicateSuppressionWithRequireRedelivery(false);
1282
1283                    deliveredCounter -= deliveredMessages.size();
1284                    deliveredMessages.clear();
1285
1286                    if (!unconsumedMessages.isClosed()) {
1287
1288                        if (nonBlockingRedelivery) {
1289                            Collections.reverse(pendingSessionRedelivery);
1290
1291                            // Start up the delivery again a little later.
1292                            session.getScheduler().executeAfterDelay(new Runnable() {
1293                                @Override
1294                                public void run() {
1295                                    try {
1296                                        if (!unconsumedMessages.isClosed()) {
1297                                            for(MessageDispatch dispatch : pendingSessionRedelivery) {
1298                                                session.dispatch(dispatch);
1299                                            }
1300                                        }
1301                                    } catch (Exception e) {
1302                                        session.connection.onAsyncException(e);
1303                                    }
1304                                }
1305                            }, redeliveryDelay);
1306
1307                        } else {
1308                            // stop the delivery of messages.
1309                            unconsumedMessages.stop();
1310
1311                            final ActiveMQMessageConsumer dispatcher = this;
1312
1313                            Runnable redispatchWork = new Runnable() {
1314                                @Override
1315                                public void run() {
1316                                    try {
1317                                        if (!unconsumedMessages.isClosed()) {
1318                                            synchronized (unconsumedMessages.getMutex()) {
1319                                                for (MessageDispatch md : pendingSessionRedelivery) {
1320                                                    unconsumedMessages.enqueueFirst(md);
1321                                                }
1322
1323                                                if (messageListener.get() != null) {
1324                                                    session.redispatch(dispatcher, unconsumedMessages);
1325                                                }
1326                                            }
1327                                            if (started.get()) {
1328                                                start();
1329                                            }
1330                                        }
1331                                    } catch (JMSException e) {
1332                                        session.connection.onAsyncException(e);
1333                                    }
1334                                }
1335                            };
1336
1337                            if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
1338                                // Start up the delivery again a little later.
1339                                session.getScheduler().executeAfterDelay(redispatchWork, redeliveryDelay);
1340                            } else {
1341                                redispatchWork.run();
1342                            }
1343                        }
1344                    } else {
1345                        for (MessageDispatch md : pendingSessionRedelivery) {
1346                            session.connection.rollbackDuplicate(this, md.getMessage());
1347                        }
1348                    }
1349                }
1350            }
1351        }
1352    }
1353
1354    /*
1355     * called with unconsumedMessages && deliveredMessages locked
1356     * remove any message not re-delivered as they can't be replayed to this
1357     * consumer on rollback
1358     */
1359    private void rollbackPreviouslyDeliveredAndNotRedelivered() {
1360        if (previouslyDeliveredMessages != null) {
1361            for (PreviouslyDelivered entry: previouslyDeliveredMessages.values()) {
1362                if (!entry.redelivered) {
1363                    LOG.trace("rollback non redelivered: {}", entry.message.getMessageId());
1364                    removeFromDeliveredMessages(entry.message.getMessageId());
1365                }
1366            }
1367            clearPreviouslyDelivered();
1368        }
1369    }
1370
1371    /*
1372     * called with deliveredMessages locked
1373     */
1374    private void removeFromDeliveredMessages(MessageId key) {
1375        Iterator<MessageDispatch> iterator = deliveredMessages.iterator();
1376        while (iterator.hasNext()) {
1377            MessageDispatch candidate = iterator.next();
1378            if (key.equals(candidate.getMessage().getMessageId())) {
1379                session.connection.rollbackDuplicate(this, candidate.getMessage());
1380                iterator.remove();
1381                break;
1382            }
1383        }
1384    }
1385
1386    /*
1387     * called with deliveredMessages locked
1388     */
1389    private void clearPreviouslyDelivered() {
1390        if (previouslyDeliveredMessages != null) {
1391            previouslyDeliveredMessages.clear();
1392            previouslyDeliveredMessages = null;
1393        }
1394    }
1395
1396    @Override
1397    public void dispatch(MessageDispatch md) {
1398        MessageListener listener = this.messageListener.get();
1399        try {
1400            clearMessagesInProgress();
1401            clearDeliveredList();
1402            synchronized (unconsumedMessages.getMutex()) {
1403                if (!unconsumedMessages.isClosed()) {
1404                    // deliverySequenceId non zero means previously queued dispatch
1405                    if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this, md.getMessage())) {
1406                        if (listener != null && unconsumedMessages.isRunning()) {
1407                            if (redeliveryExceeded(md)) {
1408                                posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
1409                                return;
1410                            }
1411                            ActiveMQMessage message = createActiveMQMessage(md);
1412                            beforeMessageIsConsumed(md);
1413                            try {
1414                                boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired();
1415                                if (!expired) {
1416                                    listener.onMessage(message);
1417                                }
1418                                afterMessageIsConsumed(md, expired);
1419                            } catch (RuntimeException e) {
1420                                LOG.error("{} Exception while processing message: {}", getConsumerId(), md.getMessage().getMessageId(), e);
1421                                if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
1422                                    // schedual redelivery and possible dlq processing
1423                                    md.setRollbackCause(e);
1424                                    rollback();
1425                                } else {
1426                                    // Transacted or Client ack: Deliver the
1427                                    // next message.
1428                                    afterMessageIsConsumed(md, false);
1429                                }
1430                            }
1431                        } else {
1432                            md.setDeliverySequenceId(-1); // skip duplicate check on subsequent queued delivery
1433                            if (md.getMessage() == null) {
1434                                // End of browse or pull request timeout.
1435                                unconsumedMessages.enqueue(md);
1436                            } else {
1437                                if (!consumeExpiredMessage(md)) {
1438                                    unconsumedMessages.enqueue(md);
1439                                    if (availableListener != null) {
1440                                        availableListener.onMessageAvailable(this);
1441                                    }
1442                                } else {
1443                                    beforeMessageIsConsumed(md);
1444                                    afterMessageIsConsumed(md, true);
1445
1446                                    // Pull consumer needs to check if pull timed out and send
1447                                    // a new pull command if not.
1448                                    if (info.getCurrentPrefetchSize() == 0) {
1449                                        unconsumedMessages.enqueue(null);
1450                                    }
1451                                }
1452                            }
1453                        }
1454                    } else {
1455                        // deal with duplicate delivery
1456                        ConsumerId consumerWithPendingTransaction;
1457                        if (redeliveryExpectedInCurrentTransaction(md, true)) {
1458                            LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage());
1459                            if (transactedIndividualAck) {
1460                                immediateIndividualTransactedAck(md);
1461                            } else {
1462                                session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
1463                            }
1464                        } else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) {
1465                            LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(), md.getMessage(), consumerWithPendingTransaction);
1466                            session.getConnection().rollbackDuplicate(this, md.getMessage());
1467                            dispatch(md);
1468                        } else {
1469                            LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
1470                            posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
1471                        }
1472                    }
1473                }
1474            }
1475            if (++dispatchedCount % 1000 == 0) {
1476                dispatchedCount = 0;
1477                Thread.yield();
1478            }
1479        } catch (Exception e) {
1480            session.connection.onClientInternalException(e);
1481        }
1482    }
1483
1484    private boolean redeliveryExpectedInCurrentTransaction(MessageDispatch md, boolean markReceipt) {
1485        if (session.isTransacted()) {
1486            synchronized (deliveredMessages) {
1487                if (previouslyDeliveredMessages != null) {
1488                    PreviouslyDelivered entry;
1489                    if ((entry = previouslyDeliveredMessages.get(md.getMessage().getMessageId())) != null) {
1490                        if (markReceipt) {
1491                            entry.redelivered = true;
1492                        }
1493                        return true;
1494                    }
1495                }
1496            }
1497        }
1498        return false;
1499    }
1500
1501    private ConsumerId redeliveryPendingInCompetingTransaction(MessageDispatch md) {
1502        for (ActiveMQSession activeMQSession: session.connection.getSessions()) {
1503            for (ActiveMQMessageConsumer activeMQMessageConsumer : activeMQSession.consumers) {
1504                if (activeMQMessageConsumer.redeliveryExpectedInCurrentTransaction(md, false)) {
1505                    return activeMQMessageConsumer.getConsumerId();
1506                }
1507            }
1508        }
1509        return null;
1510    }
1511
1512    // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
1513    private void clearDeliveredList() {
1514        if (clearDeliveredList) {
1515            synchronized (deliveredMessages) {
1516                if (clearDeliveredList) {
1517                    if (!deliveredMessages.isEmpty()) {
1518                        if (session.isTransacted()) {
1519                            captureDeliveredMessagesForDuplicateSuppression();
1520                        } else {
1521                            if (session.isClientAcknowledge()) {
1522                                LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
1523                                // allow redelivery
1524                                if (!this.info.isBrowser()) {
1525                                    for (MessageDispatch md: deliveredMessages) {
1526                                        this.session.connection.rollbackDuplicate(this, md.getMessage());
1527                                    }
1528                                }
1529                            }
1530                            LOG.debug("{} clearing delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
1531                            deliveredMessages.clear();
1532                            pendingAck = null;
1533                        }
1534                    }
1535                    clearDeliveredList = false;
1536                }
1537            }
1538        }
1539    }
1540
1541    // called with deliveredMessages locked
1542    private void captureDeliveredMessagesForDuplicateSuppression() {
1543        captureDeliveredMessagesForDuplicateSuppressionWithRequireRedelivery (true);
1544    }
1545
1546    private void captureDeliveredMessagesForDuplicateSuppressionWithRequireRedelivery(boolean requireRedelivery) {
1547        if (previouslyDeliveredMessages == null) {
1548            previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, PreviouslyDelivered>(session.getTransactionContext().getTransactionId());
1549        }
1550        for (MessageDispatch delivered : deliveredMessages) {
1551            previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), new PreviouslyDelivered(delivered, !requireRedelivery));
1552        }
1553        LOG.trace("{} tracking existing transacted {} delivered list({})", getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size());
1554    }
1555
1556    public int getMessageSize() {
1557        return unconsumedMessages.size();
1558    }
1559
1560    public void start() throws JMSException {
1561        if (unconsumedMessages.isClosed()) {
1562            return;
1563        }
1564        started.set(true);
1565        unconsumedMessages.start();
1566        session.executor.wakeup();
1567    }
1568
1569    public void stop() {
1570        started.set(false);
1571        unconsumedMessages.stop();
1572    }
1573
1574    @Override
1575    public String toString() {
1576        return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
1577               + " }";
1578    }
1579
1580    /**
1581     * Delivers a message to the message listener.
1582     *
1583     * @return
1584     * @throws JMSException
1585     */
1586    public boolean iterate() {
1587        MessageListener listener = this.messageListener.get();
1588        if (listener != null) {
1589            MessageDispatch md = unconsumedMessages.dequeueNoWait();
1590            if (md != null) {
1591                dispatch(md);
1592                return true;
1593            }
1594        }
1595        return false;
1596    }
1597
1598    public boolean isInUse(ActiveMQTempDestination destination) {
1599        return info.getDestination().equals(destination);
1600    }
1601
1602    public long getLastDeliveredSequenceId() {
1603        return lastDeliveredSequenceId;
1604    }
1605
1606    public IOException getFailureError() {
1607        return failureError;
1608    }
1609
1610    public void setFailureError(IOException failureError) {
1611        this.failureError = failureError;
1612    }
1613
1614    /**
1615     * @return the optimizedAckScheduledAckInterval
1616     */
1617    public long getOptimizedAckScheduledAckInterval() {
1618        return optimizedAckScheduledAckInterval;
1619    }
1620
1621    /**
1622     * @param optimizedAckScheduledAckInterval the optimizedAckScheduledAckInterval to set
1623     */
1624    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) throws JMSException {
1625        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
1626
1627        if (this.optimizedAckTask != null) {
1628            try {
1629                this.session.connection.getScheduler().cancel(optimizedAckTask);
1630            } catch (JMSException e) {
1631                LOG.debug("Caught exception while cancelling old optimized ack task", e);
1632                throw e;
1633            }
1634            this.optimizedAckTask = null;
1635        }
1636
1637        // Should we periodically send out all outstanding acks.
1638        if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0) {
1639            this.optimizedAckTask = new Runnable() {
1640
1641                @Override
1642                public void run() {
1643                    try {
1644                        if (optimizeAcknowledge && !unconsumedMessages.isClosed()) {
1645                            LOG.info("Consumer:{} is performing scheduled delivery of outstanding optimized Acks", info.getConsumerId());
1646                            deliverAcks();
1647                        }
1648                    } catch (Exception e) {
1649                        LOG.debug("Optimized Ack Task caught exception during ack", e);
1650                    }
1651                }
1652            };
1653
1654            try {
1655                this.session.connection.getScheduler().executePeriodically(optimizedAckTask, optimizedAckScheduledAckInterval);
1656            } catch (JMSException e) {
1657                LOG.debug("Caught exception while scheduling new optimized ack task", e);
1658                throw e;
1659            }
1660        }
1661    }
1662
1663    public boolean hasMessageListener() {
1664        return messageListener.get() != null;
1665    }
1666
1667    public boolean isConsumerExpiryCheckEnabled() {
1668        return consumerExpiryCheckEnabled;
1669    }
1670
1671    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
1672        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
1673    }
1674}