001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.List;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ConcurrentMap;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028
029import javax.jms.InvalidSelectorException;
030import javax.jms.JMSException;
031
032import org.apache.activemq.broker.Broker;
033import org.apache.activemq.broker.ConnectionContext;
034import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
035import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
036import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
037import org.apache.activemq.broker.region.policy.PolicyEntry;
038import org.apache.activemq.command.ActiveMQDestination;
039import org.apache.activemq.command.ConsumerInfo;
040import org.apache.activemq.command.Message;
041import org.apache.activemq.command.MessageAck;
042import org.apache.activemq.command.MessageDispatch;
043import org.apache.activemq.command.MessageId;
044import org.apache.activemq.store.TopicMessageStore;
045import org.apache.activemq.transaction.Synchronization;
046import org.apache.activemq.usage.SystemUsage;
047import org.apache.activemq.usage.Usage;
048import org.apache.activemq.usage.UsageListener;
049import org.apache.activemq.util.SubscriptionKey;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
054
055    private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
056    private final ConcurrentMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
057    private final ConcurrentMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
058    private final SubscriptionKey subscriptionKey;
059    private final boolean keepDurableSubsActive;
060    private boolean enableMessageExpirationOnActiveDurableSubs;
061    private final AtomicBoolean active = new AtomicBoolean();
062    private final AtomicLong offlineTimestamp = new AtomicLong(-1);
063    private final HashSet<MessageId> ackedAndPrepared = new HashSet<MessageId>();
064
065    public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
066            throws JMSException {
067        super(broker, usageManager, context, info);
068        this.pending = new StoreDurableSubscriberCursor(broker, context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
069        this.pending.setSystemUsage(usageManager);
070        this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
071        this.keepDurableSubsActive = keepDurableSubsActive;
072        this.enableMessageExpirationOnActiveDurableSubs = broker.getBrokerService().isEnableMessageExpirationOnActiveDurableSubs();
073        subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
074    }
075
076    public final boolean isActive() {
077        return active.get();
078    }
079
080    public final long getOfflineTimestamp() {
081        return offlineTimestamp.get();
082    }
083
084    public void setOfflineTimestamp(long timestamp) {
085        offlineTimestamp.set(timestamp);
086    }
087
088    @Override
089    public boolean isFull() {
090        return !active.get() || super.isFull();
091    }
092
093    @Override
094    public void gc() {
095    }
096
097    /**
098     * store will have a pending ack for all durables, irrespective of the
099     * selector so we need to ack if node is un-matched
100     */
101    @Override
102    public void unmatched(MessageReference node) throws IOException {
103        MessageAck ack = new MessageAck();
104        ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
105        ack.setMessageID(node.getMessageId());
106        Destination regionDestination = (Destination) node.getRegionDestination();
107        regionDestination.acknowledge(this.getContext(), this, ack, node);
108    }
109
110    @Override
111    protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
112        // statically configured via maxPageSize
113    }
114
115    @Override
116    public void add(ConnectionContext context, Destination destination) throws Exception {
117        if (!destinations.contains(destination)) {
118            super.add(context, destination);
119        }
120        // do it just once per destination
121        if (durableDestinations.containsKey(destination.getActiveMQDestination())) {
122            return;
123        }
124        durableDestinations.put(destination.getActiveMQDestination(), destination);
125
126        if (active.get() || keepDurableSubsActive) {
127            Topic topic = (Topic) destination;
128            topic.activate(context, this);
129            getSubscriptionStatistics().getEnqueues().add(pending.size());
130        } else if (destination.getMessageStore() != null) {
131            TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
132            try {
133                getSubscriptionStatistics().getEnqueues().add(store.getMessageCount(subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName()));
134            } catch (IOException e) {
135                JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store " + e);
136                jmsEx.setLinkedException(e);
137                throw jmsEx;
138            }
139        }
140        dispatchPending();
141    }
142
143    // used by RetaineMessageSubscriptionRecoveryPolicy
144    public boolean isEmpty(Topic topic) {
145        return pending.isEmpty(topic);
146    }
147
148    public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception {
149        if (!active.get()) {
150            this.context = context;
151            this.info = info;
152
153            LOG.debug("Activating {}", this);
154            if (!keepDurableSubsActive) {
155                for (Destination destination : durableDestinations.values()) {
156                    Topic topic = (Topic) destination;
157                    add(context, topic);
158                    topic.activate(context, this);
159                }
160
161                // On Activation we should update the configuration based on our new consumer info.
162                ActiveMQDestination dest = this.info.getDestination();
163                if (dest != null && regionBroker.getDestinationPolicy() != null) {
164                    PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest);
165                    if (entry != null) {
166                        entry.configure(broker, usageManager, this);
167                    }
168                }
169            }
170
171            synchronized (pendingLock) {
172                if (!((AbstractPendingMessageCursor) pending).isStarted() || !keepDurableSubsActive) {
173                    pending.setSystemUsage(memoryManager);
174                    pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
175                    pending.setMaxAuditDepth(getMaxAuditDepth());
176                    pending.setMaxProducersToAudit(getMaxProducersToAudit());
177                    pending.start();
178                }
179                // use recovery policy every time sub is activated for retroactive topics and consumers
180                for (Destination destination : durableDestinations.values()) {
181                    Topic topic = (Topic) destination;
182                    if (topic.isAlwaysRetroactive() || info.isRetroactive()) {
183                        topic.recoverRetroactiveMessages(context, this);
184                    }
185                }
186            }
187            this.active.set(true);
188            this.offlineTimestamp.set(-1);
189            dispatchPending();
190            this.usageManager.getMemoryUsage().addUsageListener(this);
191        }
192    }
193
194    public void deactivate(boolean keepDurableSubsActive, long lastDeliveredSequenceId) throws Exception {
195        LOG.debug("Deactivating keepActive={}, {}", keepDurableSubsActive, this);
196        active.set(false);
197        offlineTimestamp.set(System.currentTimeMillis());
198        this.usageManager.getMemoryUsage().removeUsageListener(this);
199
200        ArrayList<Topic> topicsToDeactivate = new ArrayList<Topic>();
201        List<MessageReference> savedDispateched = null;
202
203        synchronized (pendingLock) {
204            if (!keepDurableSubsActive) {
205                pending.stop();
206            }
207
208            synchronized (dispatchLock) {
209                for (Destination destination : durableDestinations.values()) {
210                    Topic topic = (Topic) destination;
211                    if (!keepDurableSubsActive) {
212                        topicsToDeactivate.add(topic);
213                    } else {
214                        topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
215                    }
216                }
217
218                // Before we add these back to pending they need to be in producer order not
219                // dispatch order so we can add them to the front of the pending list.
220                Collections.reverse(dispatched);
221
222                for (final MessageReference node : dispatched) {
223                    // Mark the dispatched messages as redelivered for next time.
224                    if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 && node.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId)) {
225                        Integer count = redeliveredMessages.get(node.getMessageId());
226                        if (count != null) {
227                            redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
228                        } else {
229                            redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
230                        }
231                    }
232                    if (keepDurableSubsActive && pending.isTransient()) {
233                        pending.addMessageFirst(node);
234                        pending.rollback(node.getMessageId());
235                    }
236                    // createMessageDispatch increments on remove from pending for dispatch
237                    node.decrementReferenceCount();
238                }
239
240                if (!topicsToDeactivate.isEmpty()) {
241                    savedDispateched = new ArrayList<MessageReference>(dispatched);
242                }
243                dispatched.clear();
244                getSubscriptionStatistics().getInflightMessageSize().reset();
245            }
246            if (!keepDurableSubsActive && pending.isTransient()) {
247                try {
248                    pending.reset();
249                    while (pending.hasNext()) {
250                        MessageReference node = pending.next();
251                        node.decrementReferenceCount();
252                        pending.remove();
253                    }
254                } finally {
255                    pending.release();
256                }
257            }
258        }
259        for(Topic topic: topicsToDeactivate) {
260            topic.deactivate(context, this, savedDispateched);
261        }
262        prefetchExtension.set(0);
263    }
264
265    @Override
266    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
267        MessageDispatch md = super.createMessageDispatch(node, message);
268        if (node != QueueMessageReference.NULL_MESSAGE) {
269            node.incrementReferenceCount();
270            Integer count = redeliveredMessages.get(node.getMessageId());
271            if (count != null) {
272                md.setRedeliveryCounter(count.intValue());
273            }
274        }
275        return md;
276    }
277
278    @Override
279    public void add(MessageReference node) throws Exception {
280        if (!active.get() && !keepDurableSubsActive) {
281            return;
282        }
283        super.add(node);
284    }
285
286    @Override
287    public void dispatchPending() throws IOException {
288        if (isActive()) {
289            super.dispatchPending();
290        }
291    }
292
293    public void removePending(MessageReference node) throws IOException {
294        pending.remove(node);
295    }
296
297    @Override
298    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
299        synchronized (pending) {
300            pending.addRecoveredMessage(message);
301        }
302    }
303
304    @Override
305    public int getPendingQueueSize() {
306        if (active.get() || keepDurableSubsActive) {
307            return super.getPendingQueueSize();
308        }
309        // TODO: need to get from store
310        return 0;
311    }
312
313    @Override
314    public void setSelector(String selector) throws InvalidSelectorException {
315        throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
316    }
317
318    @Override
319    protected boolean canDispatch(MessageReference node) {
320        return true;  // let them go, our dispatchPending gates the active / inactive state.
321    }
322
323    @Override
324    protected boolean trackedInPendingTransaction(MessageReference node) {
325        return !ackedAndPrepared.isEmpty() && ackedAndPrepared.contains(node.getMessageId());
326    }
327
328    @Override
329    protected void acknowledge(ConnectionContext context, MessageAck ack, final MessageReference node) throws IOException {
330        this.setTimeOfLastMessageAck(System.currentTimeMillis());
331        Destination regionDestination = (Destination) node.getRegionDestination();
332        regionDestination.acknowledge(context, this, ack, node);
333        redeliveredMessages.remove(node.getMessageId());
334        node.decrementReferenceCount();
335        if (context.isInTransaction() && context.getTransaction().getTransactionId().isXATransaction()) {
336            context.getTransaction().addSynchronization(new Synchronization() {
337
338                @Override
339                public void beforeCommit() throws Exception {
340                    // post xa prepare call
341                    synchronized (pendingLock) {
342                        ackedAndPrepared.add(node.getMessageId());
343                    }
344                }
345
346                @Override
347                public void afterCommit() throws Exception {
348                    synchronized (pendingLock) {
349                        // may be in the cursor post activate/load from the store
350                        pending.remove(node);
351                        ackedAndPrepared.remove(node.getMessageId());
352                    }
353                }
354
355                @Override
356                public void afterRollback() throws Exception {
357                    synchronized (pendingLock) {
358                        ackedAndPrepared.remove(node.getMessageId());
359                    }
360                    dispatchPending();
361                }
362            });
363        }
364        ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
365        if (info.isNetworkSubscription()) {
366            ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
367        }
368    }
369
370    @Override
371    public synchronized String toString() {
372        return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations="
373                + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount()
374                + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
375    }
376
377    public SubscriptionKey getSubscriptionKey() {
378        return subscriptionKey;
379    }
380
381    /**
382     * Release any references that we are holding.
383     */
384    @Override
385    public void destroy() {
386        synchronized (pendingLock) {
387            try {
388                pending.reset();
389                while (pending.hasNext()) {
390                    MessageReference node = pending.next();
391                    node.decrementReferenceCount();
392                }
393
394            } finally {
395                pending.release();
396                pending.clear();
397            }
398        }
399        synchronized (dispatchLock) {
400            for (MessageReference node : dispatched) {
401                node.decrementReferenceCount();
402            }
403            dispatched.clear();
404            ackedAndPrepared.clear();
405        }
406        setSlowConsumer(false);
407    }
408
409    @Override
410    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
411        if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
412            try {
413                dispatchPending();
414            } catch (IOException e) {
415                LOG.warn("problem calling dispatchMatched", e);
416            }
417        }
418    }
419
420    @Override
421    protected boolean isDropped(MessageReference node) {
422        return false;
423    }
424
425    public boolean isKeepDurableSubsActive() {
426        return keepDurableSubsActive;
427    }
428
429    public boolean isEnableMessageExpirationOnActiveDurableSubs() {
430        return enableMessageExpirationOnActiveDurableSubs;
431    }
432}