001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.List; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.ConcurrentMap; 025import java.util.concurrent.atomic.AtomicBoolean; 026import java.util.concurrent.atomic.AtomicLong; 027 028import javax.jms.InvalidSelectorException; 029import javax.jms.JMSException; 030 031import org.apache.activemq.broker.Broker; 032import org.apache.activemq.broker.ConnectionContext; 033import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor; 034import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 035import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; 036import org.apache.activemq.broker.region.policy.PolicyEntry; 037import org.apache.activemq.command.ActiveMQDestination; 038import org.apache.activemq.command.ConsumerInfo; 039import org.apache.activemq.command.Message; 040import org.apache.activemq.command.MessageAck; 041import org.apache.activemq.command.MessageDispatch; 042import org.apache.activemq.command.MessageId; 043import org.apache.activemq.store.TopicMessageStore; 044import org.apache.activemq.usage.SystemUsage; 045import org.apache.activemq.usage.Usage; 046import org.apache.activemq.usage.UsageListener; 047import org.apache.activemq.util.SubscriptionKey; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener { 052 053 private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class); 054 private final ConcurrentMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>(); 055 private final ConcurrentMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 056 private final SubscriptionKey subscriptionKey; 057 private final boolean keepDurableSubsActive; 058 private final AtomicBoolean active = new AtomicBoolean(); 059 private final AtomicLong offlineTimestamp = new AtomicLong(-1); 060 061 public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) 062 throws JMSException { 063 super(broker, usageManager, context, info); 064 this.pending = new StoreDurableSubscriberCursor(broker, context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this); 065 this.pending.setSystemUsage(usageManager); 066 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 067 this.keepDurableSubsActive = keepDurableSubsActive; 068 subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 069 } 070 071 public final boolean isActive() { 072 return active.get(); 073 } 074 075 public final long getOfflineTimestamp() { 076 return offlineTimestamp.get(); 077 } 078 079 public void setOfflineTimestamp(long timestamp) { 080 offlineTimestamp.set(timestamp); 081 } 082 083 @Override 084 public boolean isFull() { 085 return !active.get() || super.isFull(); 086 } 087 088 @Override 089 public void gc() { 090 } 091 092 /** 093 * store will have a pending ack for all durables, irrespective of the 094 * selector so we need to ack if node is un-matched 095 */ 096 @Override 097 public void unmatched(MessageReference node) throws IOException { 098 MessageAck ack = new MessageAck(); 099 ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE); 100 ack.setMessageID(node.getMessageId()); 101 Destination regionDestination = (Destination) node.getRegionDestination(); 102 regionDestination.acknowledge(this.getContext(), this, ack, node); 103 } 104 105 @Override 106 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) { 107 // statically configured via maxPageSize 108 } 109 110 @Override 111 public void add(ConnectionContext context, Destination destination) throws Exception { 112 if (!destinations.contains(destination)) { 113 super.add(context, destination); 114 } 115 // do it just once per destination 116 if (durableDestinations.containsKey(destination.getActiveMQDestination())) { 117 return; 118 } 119 durableDestinations.put(destination.getActiveMQDestination(), destination); 120 121 if (active.get() || keepDurableSubsActive) { 122 Topic topic = (Topic) destination; 123 topic.activate(context, this); 124 getSubscriptionStatistics().getEnqueues().add(pending.size()); 125 } else if (destination.getMessageStore() != null) { 126 TopicMessageStore store = (TopicMessageStore) destination.getMessageStore(); 127 try { 128 getSubscriptionStatistics().getEnqueues().add(store.getMessageCount(subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName())); 129 } catch (IOException e) { 130 JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store " + e); 131 jmsEx.setLinkedException(e); 132 throw jmsEx; 133 } 134 } 135 dispatchPending(); 136 } 137 138 // used by RetaineMessageSubscriptionRecoveryPolicy 139 public boolean isEmpty(Topic topic) { 140 return pending.isEmpty(topic); 141 } 142 143 public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception { 144 if (!active.get()) { 145 this.context = context; 146 this.info = info; 147 148 LOG.debug("Activating {}", this); 149 if (!keepDurableSubsActive) { 150 for (Destination destination : durableDestinations.values()) { 151 Topic topic = (Topic) destination; 152 add(context, topic); 153 topic.activate(context, this); 154 } 155 156 // On Activation we should update the configuration based on our new consumer info. 157 ActiveMQDestination dest = this.info.getDestination(); 158 if (dest != null && regionBroker.getDestinationPolicy() != null) { 159 PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest); 160 if (entry != null) { 161 entry.configure(broker, usageManager, this); 162 } 163 } 164 } 165 166 synchronized (pendingLock) { 167 if (!((AbstractPendingMessageCursor) pending).isStarted() || !keepDurableSubsActive) { 168 pending.setSystemUsage(memoryManager); 169 pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 170 pending.setMaxAuditDepth(getMaxAuditDepth()); 171 pending.setMaxProducersToAudit(getMaxProducersToAudit()); 172 pending.start(); 173 } 174 // use recovery policy every time sub is activated for retroactive topics and consumers 175 for (Destination destination : durableDestinations.values()) { 176 Topic topic = (Topic) destination; 177 if (topic.isAlwaysRetroactive() || info.isRetroactive()) { 178 topic.recoverRetroactiveMessages(context, this); 179 } 180 } 181 } 182 this.active.set(true); 183 this.offlineTimestamp.set(-1); 184 dispatchPending(); 185 this.usageManager.getMemoryUsage().addUsageListener(this); 186 } 187 } 188 189 public void deactivate(boolean keepDurableSubsActive, long lastDeliveredSequenceId) throws Exception { 190 LOG.debug("Deactivating keepActive={}, {}", keepDurableSubsActive, this); 191 active.set(false); 192 offlineTimestamp.set(System.currentTimeMillis()); 193 this.usageManager.getMemoryUsage().removeUsageListener(this); 194 195 ArrayList<Topic> topicsToDeactivate = new ArrayList<Topic>(); 196 List<MessageReference> savedDispateched = null; 197 198 synchronized (pendingLock) { 199 if (!keepDurableSubsActive) { 200 pending.stop(); 201 } 202 203 synchronized (dispatchLock) { 204 for (Destination destination : durableDestinations.values()) { 205 Topic topic = (Topic) destination; 206 if (!keepDurableSubsActive) { 207 topicsToDeactivate.add(topic); 208 } else { 209 topic.getDestinationStatistics().getInflight().subtract(dispatched.size()); 210 } 211 } 212 213 // Before we add these back to pending they need to be in producer order not 214 // dispatch order so we can add them to the front of the pending list. 215 Collections.reverse(dispatched); 216 217 for (final MessageReference node : dispatched) { 218 // Mark the dispatched messages as redelivered for next time. 219 if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 && node.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId)) { 220 Integer count = redeliveredMessages.get(node.getMessageId()); 221 if (count != null) { 222 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1)); 223 } else { 224 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1)); 225 } 226 } 227 if (keepDurableSubsActive && pending.isTransient()) { 228 pending.addMessageFirst(node); 229 pending.rollback(node.getMessageId()); 230 } 231 // createMessageDispatch increments on remove from pending for dispatch 232 node.decrementReferenceCount(); 233 } 234 235 if (!topicsToDeactivate.isEmpty()) { 236 savedDispateched = new ArrayList<MessageReference>(dispatched); 237 } 238 dispatched.clear(); 239 getSubscriptionStatistics().getInflightMessageSize().reset(); 240 } 241 if (!keepDurableSubsActive && pending.isTransient()) { 242 try { 243 pending.reset(); 244 while (pending.hasNext()) { 245 MessageReference node = pending.next(); 246 node.decrementReferenceCount(); 247 pending.remove(); 248 } 249 } finally { 250 pending.release(); 251 } 252 } 253 } 254 for(Topic topic: topicsToDeactivate) { 255 topic.deactivate(context, this, savedDispateched); 256 } 257 prefetchExtension.set(0); 258 } 259 260 @Override 261 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { 262 MessageDispatch md = super.createMessageDispatch(node, message); 263 if (node != QueueMessageReference.NULL_MESSAGE) { 264 node.incrementReferenceCount(); 265 Integer count = redeliveredMessages.get(node.getMessageId()); 266 if (count != null) { 267 md.setRedeliveryCounter(count.intValue()); 268 } 269 } 270 return md; 271 } 272 273 @Override 274 public void add(MessageReference node) throws Exception { 275 if (!active.get() && !keepDurableSubsActive) { 276 return; 277 } 278 super.add(node); 279 } 280 281 @Override 282 public void dispatchPending() throws IOException { 283 if (isActive()) { 284 super.dispatchPending(); 285 } 286 } 287 288 public void removePending(MessageReference node) throws IOException { 289 pending.remove(node); 290 } 291 292 @Override 293 protected void doAddRecoveredMessage(MessageReference message) throws Exception { 294 synchronized (pending) { 295 pending.addRecoveredMessage(message); 296 } 297 } 298 299 @Override 300 public int getPendingQueueSize() { 301 if (active.get() || keepDurableSubsActive) { 302 return super.getPendingQueueSize(); 303 } 304 // TODO: need to get from store 305 return 0; 306 } 307 308 @Override 309 public void setSelector(String selector) throws InvalidSelectorException { 310 throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions"); 311 } 312 313 @Override 314 protected boolean canDispatch(MessageReference node) { 315 return true; // let them go, our dispatchPending gates the active / inactive state. 316 } 317 318 @Override 319 protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException { 320 this.setTimeOfLastMessageAck(System.currentTimeMillis()); 321 Destination regionDestination = (Destination) node.getRegionDestination(); 322 regionDestination.acknowledge(context, this, ack, node); 323 redeliveredMessages.remove(node.getMessageId()); 324 node.decrementReferenceCount(); 325 ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment(); 326 if (info.isNetworkSubscription()) { 327 ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount()); 328 } 329 } 330 331 @Override 332 public synchronized String toString() { 333 return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" 334 + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() 335 + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension(); 336 } 337 338 public SubscriptionKey getSubscriptionKey() { 339 return subscriptionKey; 340 } 341 342 /** 343 * Release any references that we are holding. 344 */ 345 @Override 346 public void destroy() { 347 synchronized (pendingLock) { 348 try { 349 pending.reset(); 350 while (pending.hasNext()) { 351 MessageReference node = pending.next(); 352 node.decrementReferenceCount(); 353 } 354 355 } finally { 356 pending.release(); 357 pending.clear(); 358 } 359 } 360 synchronized (dispatchLock) { 361 for (MessageReference node : dispatched) { 362 node.decrementReferenceCount(); 363 } 364 dispatched.clear(); 365 } 366 setSlowConsumer(false); 367 } 368 369 @Override 370 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 371 if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) { 372 try { 373 dispatchPending(); 374 } catch (IOException e) { 375 LOG.warn("problem calling dispatchMatched", e); 376 } 377 } 378 } 379 380 @Override 381 protected boolean isDropped(MessageReference node) { 382 return false; 383 } 384 385 public boolean isKeepDurableSubsActive() { 386 return keepDurableSubsActive; 387 } 388}