001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.List;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ConcurrentMap;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028
029import javax.jms.InvalidSelectorException;
030import javax.jms.JMSException;
031
032import org.apache.activemq.broker.Broker;
033import org.apache.activemq.broker.ConnectionContext;
034import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
035import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
036import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
037import org.apache.activemq.broker.region.policy.PolicyEntry;
038import org.apache.activemq.command.ActiveMQDestination;
039import org.apache.activemq.command.ConsumerInfo;
040import org.apache.activemq.command.Message;
041import org.apache.activemq.command.MessageAck;
042import org.apache.activemq.command.MessageDispatch;
043import org.apache.activemq.command.MessageId;
044import org.apache.activemq.store.TopicMessageStore;
045import org.apache.activemq.transaction.Synchronization;
046import org.apache.activemq.usage.SystemUsage;
047import org.apache.activemq.usage.Usage;
048import org.apache.activemq.usage.UsageListener;
049import org.apache.activemq.util.SubscriptionKey;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
054
055    private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
056    private final ConcurrentMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
057    private final ConcurrentMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
058    private final SubscriptionKey subscriptionKey;
059    private final boolean keepDurableSubsActive;
060    private final AtomicBoolean active = new AtomicBoolean();
061    private final AtomicLong offlineTimestamp = new AtomicLong(-1);
062    private final HashSet<MessageId> ackedAndPrepared = new HashSet<MessageId>();
063
064    public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
065            throws JMSException {
066        super(broker, usageManager, context, info);
067        this.pending = new StoreDurableSubscriberCursor(broker, context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
068        this.pending.setSystemUsage(usageManager);
069        this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
070        this.keepDurableSubsActive = keepDurableSubsActive;
071        subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
072    }
073
074    public final boolean isActive() {
075        return active.get();
076    }
077
078    public final long getOfflineTimestamp() {
079        return offlineTimestamp.get();
080    }
081
082    public void setOfflineTimestamp(long timestamp) {
083        offlineTimestamp.set(timestamp);
084    }
085
086    @Override
087    public boolean isFull() {
088        return !active.get() || super.isFull();
089    }
090
091    @Override
092    public void gc() {
093    }
094
095    /**
096     * store will have a pending ack for all durables, irrespective of the
097     * selector so we need to ack if node is un-matched
098     */
099    @Override
100    public void unmatched(MessageReference node) throws IOException {
101        MessageAck ack = new MessageAck();
102        ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
103        ack.setMessageID(node.getMessageId());
104        Destination regionDestination = (Destination) node.getRegionDestination();
105        regionDestination.acknowledge(this.getContext(), this, ack, node);
106    }
107
108    @Override
109    protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
110        // statically configured via maxPageSize
111    }
112
113    @Override
114    public void add(ConnectionContext context, Destination destination) throws Exception {
115        if (!destinations.contains(destination)) {
116            super.add(context, destination);
117        }
118        // do it just once per destination
119        if (durableDestinations.containsKey(destination.getActiveMQDestination())) {
120            return;
121        }
122        durableDestinations.put(destination.getActiveMQDestination(), destination);
123
124        if (active.get() || keepDurableSubsActive) {
125            Topic topic = (Topic) destination;
126            topic.activate(context, this);
127            getSubscriptionStatistics().getEnqueues().add(pending.size());
128        } else if (destination.getMessageStore() != null) {
129            TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
130            try {
131                getSubscriptionStatistics().getEnqueues().add(store.getMessageCount(subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName()));
132            } catch (IOException e) {
133                JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store " + e);
134                jmsEx.setLinkedException(e);
135                throw jmsEx;
136            }
137        }
138        dispatchPending();
139    }
140
141    // used by RetaineMessageSubscriptionRecoveryPolicy
142    public boolean isEmpty(Topic topic) {
143        return pending.isEmpty(topic);
144    }
145
146    public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception {
147        if (!active.get()) {
148            this.context = context;
149            this.info = info;
150
151            LOG.debug("Activating {}", this);
152            if (!keepDurableSubsActive) {
153                for (Destination destination : durableDestinations.values()) {
154                    Topic topic = (Topic) destination;
155                    add(context, topic);
156                    topic.activate(context, this);
157                }
158
159                // On Activation we should update the configuration based on our new consumer info.
160                ActiveMQDestination dest = this.info.getDestination();
161                if (dest != null && regionBroker.getDestinationPolicy() != null) {
162                    PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest);
163                    if (entry != null) {
164                        entry.configure(broker, usageManager, this);
165                    }
166                }
167            }
168
169            synchronized (pendingLock) {
170                if (!((AbstractPendingMessageCursor) pending).isStarted() || !keepDurableSubsActive) {
171                    pending.setSystemUsage(memoryManager);
172                    pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
173                    pending.setMaxAuditDepth(getMaxAuditDepth());
174                    pending.setMaxProducersToAudit(getMaxProducersToAudit());
175                    pending.start();
176                }
177                // use recovery policy every time sub is activated for retroactive topics and consumers
178                for (Destination destination : durableDestinations.values()) {
179                    Topic topic = (Topic) destination;
180                    if (topic.isAlwaysRetroactive() || info.isRetroactive()) {
181                        topic.recoverRetroactiveMessages(context, this);
182                    }
183                }
184            }
185            this.active.set(true);
186            this.offlineTimestamp.set(-1);
187            dispatchPending();
188            this.usageManager.getMemoryUsage().addUsageListener(this);
189        }
190    }
191
192    public void deactivate(boolean keepDurableSubsActive, long lastDeliveredSequenceId) throws Exception {
193        LOG.debug("Deactivating keepActive={}, {}", keepDurableSubsActive, this);
194        active.set(false);
195        offlineTimestamp.set(System.currentTimeMillis());
196        this.usageManager.getMemoryUsage().removeUsageListener(this);
197
198        ArrayList<Topic> topicsToDeactivate = new ArrayList<Topic>();
199        List<MessageReference> savedDispateched = null;
200
201        synchronized (pendingLock) {
202            if (!keepDurableSubsActive) {
203                pending.stop();
204            }
205
206            synchronized (dispatchLock) {
207                for (Destination destination : durableDestinations.values()) {
208                    Topic topic = (Topic) destination;
209                    if (!keepDurableSubsActive) {
210                        topicsToDeactivate.add(topic);
211                    } else {
212                        topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
213                    }
214                }
215
216                // Before we add these back to pending they need to be in producer order not
217                // dispatch order so we can add them to the front of the pending list.
218                Collections.reverse(dispatched);
219
220                for (final MessageReference node : dispatched) {
221                    // Mark the dispatched messages as redelivered for next time.
222                    if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 && node.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId)) {
223                        Integer count = redeliveredMessages.get(node.getMessageId());
224                        if (count != null) {
225                            redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
226                        } else {
227                            redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
228                        }
229                    }
230                    if (keepDurableSubsActive && pending.isTransient()) {
231                        pending.addMessageFirst(node);
232                        pending.rollback(node.getMessageId());
233                    }
234                    // createMessageDispatch increments on remove from pending for dispatch
235                    node.decrementReferenceCount();
236                }
237
238                if (!topicsToDeactivate.isEmpty()) {
239                    savedDispateched = new ArrayList<MessageReference>(dispatched);
240                }
241                dispatched.clear();
242                getSubscriptionStatistics().getInflightMessageSize().reset();
243            }
244            if (!keepDurableSubsActive && pending.isTransient()) {
245                try {
246                    pending.reset();
247                    while (pending.hasNext()) {
248                        MessageReference node = pending.next();
249                        node.decrementReferenceCount();
250                        pending.remove();
251                    }
252                } finally {
253                    pending.release();
254                }
255            }
256        }
257        for(Topic topic: topicsToDeactivate) {
258            topic.deactivate(context, this, savedDispateched);
259        }
260        prefetchExtension.set(0);
261    }
262
263    @Override
264    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
265        MessageDispatch md = super.createMessageDispatch(node, message);
266        if (node != QueueMessageReference.NULL_MESSAGE) {
267            node.incrementReferenceCount();
268            Integer count = redeliveredMessages.get(node.getMessageId());
269            if (count != null) {
270                md.setRedeliveryCounter(count.intValue());
271            }
272        }
273        return md;
274    }
275
276    @Override
277    public void add(MessageReference node) throws Exception {
278        if (!active.get() && !keepDurableSubsActive) {
279            return;
280        }
281        super.add(node);
282    }
283
284    @Override
285    public void dispatchPending() throws IOException {
286        if (isActive()) {
287            super.dispatchPending();
288        }
289    }
290
291    public void removePending(MessageReference node) throws IOException {
292        pending.remove(node);
293    }
294
295    @Override
296    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
297        synchronized (pending) {
298            pending.addRecoveredMessage(message);
299        }
300    }
301
302    @Override
303    public int getPendingQueueSize() {
304        if (active.get() || keepDurableSubsActive) {
305            return super.getPendingQueueSize();
306        }
307        // TODO: need to get from store
308        return 0;
309    }
310
311    @Override
312    public void setSelector(String selector) throws InvalidSelectorException {
313        throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
314    }
315
316    @Override
317    protected boolean canDispatch(MessageReference node) {
318        return true;  // let them go, our dispatchPending gates the active / inactive state.
319    }
320
321    @Override
322    protected boolean trackedInPendingTransaction(MessageReference node) {
323        return !ackedAndPrepared.isEmpty() && ackedAndPrepared.contains(node.getMessageId());
324    }
325
326    @Override
327    protected void acknowledge(ConnectionContext context, MessageAck ack, final MessageReference node) throws IOException {
328        this.setTimeOfLastMessageAck(System.currentTimeMillis());
329        Destination regionDestination = (Destination) node.getRegionDestination();
330        regionDestination.acknowledge(context, this, ack, node);
331        redeliveredMessages.remove(node.getMessageId());
332        node.decrementReferenceCount();
333        if (context.isInTransaction() && context.getTransaction().getTransactionId().isXATransaction()) {
334            context.getTransaction().addSynchronization(new Synchronization() {
335
336                @Override
337                public void beforeCommit() throws Exception {
338                    // post xa prepare call
339                    synchronized (pendingLock) {
340                        ackedAndPrepared.add(node.getMessageId());
341                    }
342                }
343
344                @Override
345                public void afterCommit() throws Exception {
346                    synchronized (pendingLock) {
347                        // may be in the cursor post activate/load from the store
348                        pending.remove(node);
349                        ackedAndPrepared.remove(node.getMessageId());
350                    }
351                }
352
353                @Override
354                public void afterRollback() throws Exception {
355                    synchronized (pendingLock) {
356                        ackedAndPrepared.remove(node.getMessageId());
357                    }
358                    dispatchPending();
359                }
360            });
361        }
362        ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
363        if (info.isNetworkSubscription()) {
364            ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
365        }
366    }
367
368    @Override
369    public synchronized String toString() {
370        return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations="
371                + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount()
372                + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
373    }
374
375    public SubscriptionKey getSubscriptionKey() {
376        return subscriptionKey;
377    }
378
379    /**
380     * Release any references that we are holding.
381     */
382    @Override
383    public void destroy() {
384        synchronized (pendingLock) {
385            try {
386                pending.reset();
387                while (pending.hasNext()) {
388                    MessageReference node = pending.next();
389                    node.decrementReferenceCount();
390                }
391
392            } finally {
393                pending.release();
394                pending.clear();
395            }
396        }
397        synchronized (dispatchLock) {
398            for (MessageReference node : dispatched) {
399                node.decrementReferenceCount();
400            }
401            dispatched.clear();
402            ackedAndPrepared.clear();
403        }
404        setSlowConsumer(false);
405    }
406
407    @Override
408    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
409        if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
410            try {
411                dispatchPending();
412            } catch (IOException e) {
413                LOG.warn("problem calling dispatchMatched", e);
414            }
415        }
416    }
417
418    @Override
419    protected boolean isDropped(MessageReference node) {
420        return false;
421    }
422
423    public boolean isKeepDurableSubsActive() {
424        return keepDurableSubsActive;
425    }
426}