001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.HashSet; 023import java.util.List; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicLong; 028 029import javax.jms.InvalidSelectorException; 030import javax.jms.JMSException; 031 032import org.apache.activemq.broker.Broker; 033import org.apache.activemq.broker.ConnectionContext; 034import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor; 035import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 036import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; 037import org.apache.activemq.broker.region.policy.PolicyEntry; 038import org.apache.activemq.command.ActiveMQDestination; 039import org.apache.activemq.command.ConsumerInfo; 040import org.apache.activemq.command.Message; 041import org.apache.activemq.command.MessageAck; 042import org.apache.activemq.command.MessageDispatch; 043import org.apache.activemq.command.MessageId; 044import org.apache.activemq.store.TopicMessageStore; 045import org.apache.activemq.transaction.Synchronization; 046import org.apache.activemq.usage.SystemUsage; 047import org.apache.activemq.usage.Usage; 048import org.apache.activemq.usage.UsageListener; 049import org.apache.activemq.util.SubscriptionKey; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener { 054 055 private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class); 056 private final ConcurrentMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>(); 057 private final ConcurrentMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 058 private final SubscriptionKey subscriptionKey; 059 private final boolean keepDurableSubsActive; 060 private boolean enableMessageExpirationOnActiveDurableSubs; 061 private final AtomicBoolean active = new AtomicBoolean(); 062 private final AtomicLong offlineTimestamp = new AtomicLong(-1); 063 private final HashSet<MessageId> ackedAndPrepared = new HashSet<MessageId>(); 064 065 public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) 066 throws JMSException { 067 super(broker, usageManager, context, info); 068 this.pending = new StoreDurableSubscriberCursor(broker, context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this); 069 this.pending.setSystemUsage(usageManager); 070 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 071 this.keepDurableSubsActive = keepDurableSubsActive; 072 this.enableMessageExpirationOnActiveDurableSubs = broker.getBrokerService().isEnableMessageExpirationOnActiveDurableSubs(); 073 subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 074 } 075 076 public final boolean isActive() { 077 return active.get(); 078 } 079 080 public final long getOfflineTimestamp() { 081 return offlineTimestamp.get(); 082 } 083 084 public void setOfflineTimestamp(long timestamp) { 085 offlineTimestamp.set(timestamp); 086 } 087 088 @Override 089 public boolean isFull() { 090 return !active.get() || super.isFull(); 091 } 092 093 @Override 094 public void gc() { 095 } 096 097 /** 098 * store will have a pending ack for all durables, irrespective of the 099 * selector so we need to ack if node is un-matched 100 */ 101 @Override 102 public void unmatched(MessageReference node) throws IOException { 103 MessageAck ack = new MessageAck(); 104 ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE); 105 ack.setMessageID(node.getMessageId()); 106 Destination regionDestination = (Destination) node.getRegionDestination(); 107 regionDestination.acknowledge(this.getContext(), this, ack, node); 108 } 109 110 @Override 111 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) { 112 // statically configured via maxPageSize 113 } 114 115 @Override 116 public void add(ConnectionContext context, Destination destination) throws Exception { 117 if (!destinations.contains(destination)) { 118 super.add(context, destination); 119 } 120 // do it just once per destination 121 if (durableDestinations.containsKey(destination.getActiveMQDestination())) { 122 return; 123 } 124 durableDestinations.put(destination.getActiveMQDestination(), destination); 125 126 if (active.get() || keepDurableSubsActive) { 127 Topic topic = (Topic) destination; 128 topic.activate(context, this); 129 getSubscriptionStatistics().getEnqueues().add(pending.size()); 130 } else if (destination.getMessageStore() != null) { 131 TopicMessageStore store = (TopicMessageStore) destination.getMessageStore(); 132 try { 133 getSubscriptionStatistics().getEnqueues().add(store.getMessageCount(subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName())); 134 } catch (IOException e) { 135 JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store " + e); 136 jmsEx.setLinkedException(e); 137 throw jmsEx; 138 } 139 } 140 dispatchPending(); 141 } 142 143 // used by RetaineMessageSubscriptionRecoveryPolicy 144 public boolean isEmpty(Topic topic) { 145 return pending.isEmpty(topic); 146 } 147 148 public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception { 149 if (!active.get()) { 150 this.context = context; 151 this.info = info; 152 153 LOG.debug("Activating {}", this); 154 if (!keepDurableSubsActive) { 155 for (Destination destination : durableDestinations.values()) { 156 Topic topic = (Topic) destination; 157 add(context, topic); 158 topic.activate(context, this); 159 } 160 161 // On Activation we should update the configuration based on our new consumer info. 162 ActiveMQDestination dest = this.info.getDestination(); 163 if (dest != null && regionBroker.getDestinationPolicy() != null) { 164 PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest); 165 if (entry != null) { 166 entry.configure(broker, usageManager, this); 167 } 168 } 169 } 170 171 synchronized (pendingLock) { 172 if (!((AbstractPendingMessageCursor) pending).isStarted() || !keepDurableSubsActive) { 173 pending.setSystemUsage(memoryManager); 174 pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 175 pending.setMaxAuditDepth(getMaxAuditDepth()); 176 pending.setMaxProducersToAudit(getMaxProducersToAudit()); 177 pending.start(); 178 } 179 // use recovery policy every time sub is activated for retroactive topics and consumers 180 for (Destination destination : durableDestinations.values()) { 181 Topic topic = (Topic) destination; 182 if (topic.isAlwaysRetroactive() || info.isRetroactive()) { 183 topic.recoverRetroactiveMessages(context, this); 184 } 185 } 186 } 187 this.active.set(true); 188 this.offlineTimestamp.set(-1); 189 dispatchPending(); 190 this.usageManager.getMemoryUsage().addUsageListener(this); 191 } 192 } 193 194 public void deactivate(boolean keepDurableSubsActive, long lastDeliveredSequenceId) throws Exception { 195 LOG.debug("Deactivating keepActive={}, {}", keepDurableSubsActive, this); 196 active.set(false); 197 offlineTimestamp.set(System.currentTimeMillis()); 198 this.usageManager.getMemoryUsage().removeUsageListener(this); 199 200 ArrayList<Topic> topicsToDeactivate = new ArrayList<Topic>(); 201 List<MessageReference> savedDispateched = null; 202 203 synchronized (pendingLock) { 204 if (!keepDurableSubsActive) { 205 pending.stop(); 206 } 207 208 synchronized (dispatchLock) { 209 for (Destination destination : durableDestinations.values()) { 210 Topic topic = (Topic) destination; 211 if (!keepDurableSubsActive) { 212 topicsToDeactivate.add(topic); 213 } else { 214 topic.getDestinationStatistics().getInflight().subtract(dispatched.size()); 215 } 216 } 217 218 // Before we add these back to pending they need to be in producer order not 219 // dispatch order so we can add them to the front of the pending list. 220 Collections.reverse(dispatched); 221 222 for (final MessageReference node : dispatched) { 223 // Mark the dispatched messages as redelivered for next time. 224 if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 && node.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId)) { 225 Integer count = redeliveredMessages.get(node.getMessageId()); 226 if (count != null) { 227 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1)); 228 } else { 229 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1)); 230 } 231 } 232 if (keepDurableSubsActive && pending.isTransient()) { 233 pending.addMessageFirst(node); 234 pending.rollback(node.getMessageId()); 235 } 236 // createMessageDispatch increments on remove from pending for dispatch 237 node.decrementReferenceCount(); 238 } 239 240 if (!topicsToDeactivate.isEmpty()) { 241 savedDispateched = new ArrayList<MessageReference>(dispatched); 242 } 243 dispatched.clear(); 244 getSubscriptionStatistics().getInflightMessageSize().reset(); 245 } 246 if (!keepDurableSubsActive && pending.isTransient()) { 247 try { 248 pending.reset(); 249 while (pending.hasNext()) { 250 MessageReference node = pending.next(); 251 node.decrementReferenceCount(); 252 pending.remove(); 253 } 254 } finally { 255 pending.release(); 256 } 257 } 258 } 259 for(Topic topic: topicsToDeactivate) { 260 topic.deactivate(context, this, savedDispateched); 261 } 262 prefetchExtension.set(0); 263 } 264 265 @Override 266 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { 267 MessageDispatch md = super.createMessageDispatch(node, message); 268 if (node != QueueMessageReference.NULL_MESSAGE) { 269 node.incrementReferenceCount(); 270 Integer count = redeliveredMessages.get(node.getMessageId()); 271 if (count != null) { 272 md.setRedeliveryCounter(count.intValue()); 273 } 274 } 275 return md; 276 } 277 278 @Override 279 public void add(MessageReference node) throws Exception { 280 if (!active.get() && !keepDurableSubsActive) { 281 return; 282 } 283 super.add(node); 284 } 285 286 @Override 287 public void dispatchPending() throws IOException { 288 if (isActive()) { 289 super.dispatchPending(); 290 } 291 } 292 293 public void removePending(MessageReference node) throws IOException { 294 pending.remove(node); 295 } 296 297 @Override 298 protected void doAddRecoveredMessage(MessageReference message) throws Exception { 299 synchronized (pending) { 300 pending.addRecoveredMessage(message); 301 } 302 } 303 304 @Override 305 public int getPendingQueueSize() { 306 if (active.get() || keepDurableSubsActive) { 307 return super.getPendingQueueSize(); 308 } 309 // TODO: need to get from store 310 return 0; 311 } 312 313 @Override 314 public void setSelector(String selector) throws InvalidSelectorException { 315 throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions"); 316 } 317 318 @Override 319 protected boolean canDispatch(MessageReference node) { 320 return true; // let them go, our dispatchPending gates the active / inactive state. 321 } 322 323 @Override 324 protected boolean trackedInPendingTransaction(MessageReference node) { 325 return !ackedAndPrepared.isEmpty() && ackedAndPrepared.contains(node.getMessageId()); 326 } 327 328 @Override 329 protected void acknowledge(ConnectionContext context, MessageAck ack, final MessageReference node) throws IOException { 330 this.setTimeOfLastMessageAck(System.currentTimeMillis()); 331 Destination regionDestination = (Destination) node.getRegionDestination(); 332 regionDestination.acknowledge(context, this, ack, node); 333 redeliveredMessages.remove(node.getMessageId()); 334 node.decrementReferenceCount(); 335 if (context.isInTransaction() && context.getTransaction().getTransactionId().isXATransaction()) { 336 context.getTransaction().addSynchronization(new Synchronization() { 337 338 @Override 339 public void beforeCommit() throws Exception { 340 // post xa prepare call 341 synchronized (pendingLock) { 342 ackedAndPrepared.add(node.getMessageId()); 343 } 344 } 345 346 @Override 347 public void afterCommit() throws Exception { 348 synchronized (pendingLock) { 349 // may be in the cursor post activate/load from the store 350 pending.remove(node); 351 ackedAndPrepared.remove(node.getMessageId()); 352 } 353 } 354 355 @Override 356 public void afterRollback() throws Exception { 357 synchronized (pendingLock) { 358 ackedAndPrepared.remove(node.getMessageId()); 359 } 360 dispatchPending(); 361 } 362 }); 363 } 364 ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment(); 365 if (info.isNetworkSubscription()) { 366 ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount()); 367 } 368 } 369 370 @Override 371 public synchronized String toString() { 372 return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" 373 + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() 374 + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension(); 375 } 376 377 public SubscriptionKey getSubscriptionKey() { 378 return subscriptionKey; 379 } 380 381 /** 382 * Release any references that we are holding. 383 */ 384 @Override 385 public void destroy() { 386 synchronized (pendingLock) { 387 try { 388 pending.reset(); 389 while (pending.hasNext()) { 390 MessageReference node = pending.next(); 391 node.decrementReferenceCount(); 392 } 393 394 } finally { 395 pending.release(); 396 pending.clear(); 397 } 398 } 399 synchronized (dispatchLock) { 400 for (MessageReference node : dispatched) { 401 node.decrementReferenceCount(); 402 } 403 dispatched.clear(); 404 ackedAndPrepared.clear(); 405 } 406 setSlowConsumer(false); 407 } 408 409 @Override 410 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 411 if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) { 412 try { 413 dispatchPending(); 414 } catch (IOException e) { 415 LOG.warn("problem calling dispatchMatched", e); 416 } 417 } 418 } 419 420 @Override 421 protected boolean isDropped(MessageReference node) { 422 return false; 423 } 424 425 public boolean isKeepDurableSubsActive() { 426 return keepDurableSubsActive; 427 } 428 429 public boolean isEnableMessageExpirationOnActiveDurableSubs() { 430 return enableMessageExpirationOnActiveDurableSubs; 431 } 432}