001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.HashSet; 023import java.util.List; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicLong; 028 029import javax.jms.InvalidSelectorException; 030import javax.jms.JMSException; 031 032import org.apache.activemq.broker.Broker; 033import org.apache.activemq.broker.ConnectionContext; 034import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor; 035import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 036import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; 037import org.apache.activemq.broker.region.policy.PolicyEntry; 038import org.apache.activemq.command.ActiveMQDestination; 039import org.apache.activemq.command.ConsumerInfo; 040import org.apache.activemq.command.Message; 041import org.apache.activemq.command.MessageAck; 042import org.apache.activemq.command.MessageDispatch; 043import org.apache.activemq.command.MessageId; 044import org.apache.activemq.store.TopicMessageStore; 045import org.apache.activemq.transaction.Synchronization; 046import org.apache.activemq.usage.SystemUsage; 047import org.apache.activemq.usage.Usage; 048import org.apache.activemq.usage.UsageListener; 049import org.apache.activemq.util.SubscriptionKey; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener { 054 055 private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class); 056 private final ConcurrentMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>(); 057 private final ConcurrentMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 058 private final SubscriptionKey subscriptionKey; 059 private final boolean keepDurableSubsActive; 060 private final AtomicBoolean active = new AtomicBoolean(); 061 private final AtomicLong offlineTimestamp = new AtomicLong(-1); 062 private final HashSet<MessageId> ackedAndPrepared = new HashSet<MessageId>(); 063 064 public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) 065 throws JMSException { 066 super(broker, usageManager, context, info); 067 this.pending = new StoreDurableSubscriberCursor(broker, context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this); 068 this.pending.setSystemUsage(usageManager); 069 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 070 this.keepDurableSubsActive = keepDurableSubsActive; 071 subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 072 } 073 074 public final boolean isActive() { 075 return active.get(); 076 } 077 078 public final long getOfflineTimestamp() { 079 return offlineTimestamp.get(); 080 } 081 082 public void setOfflineTimestamp(long timestamp) { 083 offlineTimestamp.set(timestamp); 084 } 085 086 @Override 087 public boolean isFull() { 088 return !active.get() || super.isFull(); 089 } 090 091 @Override 092 public void gc() { 093 } 094 095 /** 096 * store will have a pending ack for all durables, irrespective of the 097 * selector so we need to ack if node is un-matched 098 */ 099 @Override 100 public void unmatched(MessageReference node) throws IOException { 101 MessageAck ack = new MessageAck(); 102 ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE); 103 ack.setMessageID(node.getMessageId()); 104 Destination regionDestination = (Destination) node.getRegionDestination(); 105 regionDestination.acknowledge(this.getContext(), this, ack, node); 106 } 107 108 @Override 109 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) { 110 // statically configured via maxPageSize 111 } 112 113 @Override 114 public void add(ConnectionContext context, Destination destination) throws Exception { 115 if (!destinations.contains(destination)) { 116 super.add(context, destination); 117 } 118 // do it just once per destination 119 if (durableDestinations.containsKey(destination.getActiveMQDestination())) { 120 return; 121 } 122 durableDestinations.put(destination.getActiveMQDestination(), destination); 123 124 if (active.get() || keepDurableSubsActive) { 125 Topic topic = (Topic) destination; 126 topic.activate(context, this); 127 getSubscriptionStatistics().getEnqueues().add(pending.size()); 128 } else if (destination.getMessageStore() != null) { 129 TopicMessageStore store = (TopicMessageStore) destination.getMessageStore(); 130 try { 131 getSubscriptionStatistics().getEnqueues().add(store.getMessageCount(subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName())); 132 } catch (IOException e) { 133 JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store " + e); 134 jmsEx.setLinkedException(e); 135 throw jmsEx; 136 } 137 } 138 dispatchPending(); 139 } 140 141 // used by RetaineMessageSubscriptionRecoveryPolicy 142 public boolean isEmpty(Topic topic) { 143 return pending.isEmpty(topic); 144 } 145 146 public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception { 147 if (!active.get()) { 148 this.context = context; 149 this.info = info; 150 151 LOG.debug("Activating {}", this); 152 if (!keepDurableSubsActive) { 153 for (Destination destination : durableDestinations.values()) { 154 Topic topic = (Topic) destination; 155 add(context, topic); 156 topic.activate(context, this); 157 } 158 159 // On Activation we should update the configuration based on our new consumer info. 160 ActiveMQDestination dest = this.info.getDestination(); 161 if (dest != null && regionBroker.getDestinationPolicy() != null) { 162 PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest); 163 if (entry != null) { 164 entry.configure(broker, usageManager, this); 165 } 166 } 167 } 168 169 synchronized (pendingLock) { 170 if (!((AbstractPendingMessageCursor) pending).isStarted() || !keepDurableSubsActive) { 171 pending.setSystemUsage(memoryManager); 172 pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 173 pending.setMaxAuditDepth(getMaxAuditDepth()); 174 pending.setMaxProducersToAudit(getMaxProducersToAudit()); 175 pending.start(); 176 } 177 // use recovery policy every time sub is activated for retroactive topics and consumers 178 for (Destination destination : durableDestinations.values()) { 179 Topic topic = (Topic) destination; 180 if (topic.isAlwaysRetroactive() || info.isRetroactive()) { 181 topic.recoverRetroactiveMessages(context, this); 182 } 183 } 184 } 185 this.active.set(true); 186 this.offlineTimestamp.set(-1); 187 dispatchPending(); 188 this.usageManager.getMemoryUsage().addUsageListener(this); 189 } 190 } 191 192 public void deactivate(boolean keepDurableSubsActive, long lastDeliveredSequenceId) throws Exception { 193 LOG.debug("Deactivating keepActive={}, {}", keepDurableSubsActive, this); 194 active.set(false); 195 offlineTimestamp.set(System.currentTimeMillis()); 196 this.usageManager.getMemoryUsage().removeUsageListener(this); 197 198 ArrayList<Topic> topicsToDeactivate = new ArrayList<Topic>(); 199 List<MessageReference> savedDispateched = null; 200 201 synchronized (pendingLock) { 202 if (!keepDurableSubsActive) { 203 pending.stop(); 204 } 205 206 synchronized (dispatchLock) { 207 for (Destination destination : durableDestinations.values()) { 208 Topic topic = (Topic) destination; 209 if (!keepDurableSubsActive) { 210 topicsToDeactivate.add(topic); 211 } else { 212 topic.getDestinationStatistics().getInflight().subtract(dispatched.size()); 213 } 214 } 215 216 // Before we add these back to pending they need to be in producer order not 217 // dispatch order so we can add them to the front of the pending list. 218 Collections.reverse(dispatched); 219 220 for (final MessageReference node : dispatched) { 221 // Mark the dispatched messages as redelivered for next time. 222 if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 && node.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId)) { 223 Integer count = redeliveredMessages.get(node.getMessageId()); 224 if (count != null) { 225 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1)); 226 } else { 227 redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1)); 228 } 229 } 230 if (keepDurableSubsActive && pending.isTransient()) { 231 pending.addMessageFirst(node); 232 pending.rollback(node.getMessageId()); 233 } 234 // createMessageDispatch increments on remove from pending for dispatch 235 node.decrementReferenceCount(); 236 } 237 238 if (!topicsToDeactivate.isEmpty()) { 239 savedDispateched = new ArrayList<MessageReference>(dispatched); 240 } 241 dispatched.clear(); 242 getSubscriptionStatistics().getInflightMessageSize().reset(); 243 } 244 if (!keepDurableSubsActive && pending.isTransient()) { 245 try { 246 pending.reset(); 247 while (pending.hasNext()) { 248 MessageReference node = pending.next(); 249 node.decrementReferenceCount(); 250 pending.remove(); 251 } 252 } finally { 253 pending.release(); 254 } 255 } 256 } 257 for(Topic topic: topicsToDeactivate) { 258 topic.deactivate(context, this, savedDispateched); 259 } 260 prefetchExtension.set(0); 261 } 262 263 @Override 264 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { 265 MessageDispatch md = super.createMessageDispatch(node, message); 266 if (node != QueueMessageReference.NULL_MESSAGE) { 267 node.incrementReferenceCount(); 268 Integer count = redeliveredMessages.get(node.getMessageId()); 269 if (count != null) { 270 md.setRedeliveryCounter(count.intValue()); 271 } 272 } 273 return md; 274 } 275 276 @Override 277 public void add(MessageReference node) throws Exception { 278 if (!active.get() && !keepDurableSubsActive) { 279 return; 280 } 281 super.add(node); 282 } 283 284 @Override 285 public void dispatchPending() throws IOException { 286 if (isActive()) { 287 super.dispatchPending(); 288 } 289 } 290 291 public void removePending(MessageReference node) throws IOException { 292 pending.remove(node); 293 } 294 295 @Override 296 protected void doAddRecoveredMessage(MessageReference message) throws Exception { 297 synchronized (pending) { 298 pending.addRecoveredMessage(message); 299 } 300 } 301 302 @Override 303 public int getPendingQueueSize() { 304 if (active.get() || keepDurableSubsActive) { 305 return super.getPendingQueueSize(); 306 } 307 // TODO: need to get from store 308 return 0; 309 } 310 311 @Override 312 public void setSelector(String selector) throws InvalidSelectorException { 313 throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions"); 314 } 315 316 @Override 317 protected boolean canDispatch(MessageReference node) { 318 return true; // let them go, our dispatchPending gates the active / inactive state. 319 } 320 321 @Override 322 protected boolean trackedInPendingTransaction(MessageReference node) { 323 return !ackedAndPrepared.isEmpty() && ackedAndPrepared.contains(node.getMessageId()); 324 } 325 326 @Override 327 protected void acknowledge(ConnectionContext context, MessageAck ack, final MessageReference node) throws IOException { 328 this.setTimeOfLastMessageAck(System.currentTimeMillis()); 329 Destination regionDestination = (Destination) node.getRegionDestination(); 330 regionDestination.acknowledge(context, this, ack, node); 331 redeliveredMessages.remove(node.getMessageId()); 332 node.decrementReferenceCount(); 333 if (context.isInTransaction() && context.getTransaction().getTransactionId().isXATransaction()) { 334 context.getTransaction().addSynchronization(new Synchronization() { 335 336 @Override 337 public void beforeCommit() throws Exception { 338 // post xa prepare call 339 synchronized (pendingLock) { 340 ackedAndPrepared.add(node.getMessageId()); 341 } 342 } 343 344 @Override 345 public void afterCommit() throws Exception { 346 synchronized (pendingLock) { 347 // may be in the cursor post activate/load from the store 348 pending.remove(node); 349 ackedAndPrepared.remove(node.getMessageId()); 350 } 351 } 352 353 @Override 354 public void afterRollback() throws Exception { 355 synchronized (pendingLock) { 356 ackedAndPrepared.remove(node.getMessageId()); 357 } 358 dispatchPending(); 359 } 360 }); 361 } 362 ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment(); 363 if (info.isNetworkSubscription()) { 364 ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount()); 365 } 366 } 367 368 @Override 369 public synchronized String toString() { 370 return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" 371 + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() 372 + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension(); 373 } 374 375 public SubscriptionKey getSubscriptionKey() { 376 return subscriptionKey; 377 } 378 379 /** 380 * Release any references that we are holding. 381 */ 382 @Override 383 public void destroy() { 384 synchronized (pendingLock) { 385 try { 386 pending.reset(); 387 while (pending.hasNext()) { 388 MessageReference node = pending.next(); 389 node.decrementReferenceCount(); 390 } 391 392 } finally { 393 pending.release(); 394 pending.clear(); 395 } 396 } 397 synchronized (dispatchLock) { 398 for (MessageReference node : dispatched) { 399 node.decrementReferenceCount(); 400 } 401 dispatched.clear(); 402 ackedAndPrepared.clear(); 403 } 404 setSlowConsumer(false); 405 } 406 407 @Override 408 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 409 if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) { 410 try { 411 dispatchPending(); 412 } catch (IOException e) { 413 LOG.warn("problem calling dispatchMatched", e); 414 } 415 } 416 } 417 418 @Override 419 protected boolean isDropped(MessageReference node) { 420 return false; 421 } 422 423 public boolean isKeepDurableSubsActive() { 424 return keepDurableSubsActive; 425 } 426}