001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.util.ArrayList; 020import java.util.HashSet; 021import java.util.Iterator; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.Timer; 026import java.util.TimerTask; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029 030import javax.jms.InvalidDestinationException; 031import javax.jms.JMSException; 032 033import org.apache.activemq.advisory.AdvisorySupport; 034import org.apache.activemq.broker.ConnectionContext; 035import org.apache.activemq.broker.region.policy.PolicyEntry; 036import org.apache.activemq.command.ActiveMQDestination; 037import org.apache.activemq.command.ConnectionId; 038import org.apache.activemq.command.ConsumerId; 039import org.apache.activemq.command.ConsumerInfo; 040import org.apache.activemq.command.RemoveSubscriptionInfo; 041import org.apache.activemq.command.SessionId; 042import org.apache.activemq.command.SubscriptionInfo; 043import org.apache.activemq.store.TopicMessageStore; 044import org.apache.activemq.thread.TaskRunnerFactory; 045import org.apache.activemq.usage.SystemUsage; 046import org.apache.activemq.util.LongSequenceGenerator; 047import org.apache.activemq.util.SubscriptionKey; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * 053 */ 054public class TopicRegion extends AbstractRegion { 055 private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class); 056 protected final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 057 private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator(); 058 private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId()); 059 private boolean keepDurableSubsActive; 060 061 private Timer cleanupTimer; 062 private TimerTask cleanupTask; 063 064 public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, 065 DestinationFactory destinationFactory) { 066 super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 067 if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) { 068 this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true); 069 this.cleanupTask = new TimerTask() { 070 @Override 071 public void run() { 072 doCleanup(); 073 } 074 }; 075 this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(), broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule()); 076 } 077 } 078 079 @Override 080 public void stop() throws Exception { 081 super.stop(); 082 if (cleanupTimer != null) { 083 cleanupTimer.cancel(); 084 } 085 } 086 087 public void doCleanup() { 088 long now = System.currentTimeMillis(); 089 for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) { 090 DurableTopicSubscription sub = entry.getValue(); 091 if (!sub.isActive()) { 092 long offline = sub.getOfflineTimestamp(); 093 if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) { 094 LOG.info("Destroying durable subscriber due to inactivity: {}", sub); 095 try { 096 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 097 info.setClientId(entry.getKey().getClientId()); 098 info.setSubscriptionName(entry.getKey().getSubscriptionName()); 099 ConnectionContext context = new ConnectionContext(); 100 context.setBroker(broker); 101 context.setClientId(entry.getKey().getClientId()); 102 removeSubscription(context, info); 103 } catch (Exception e) { 104 LOG.error("Failed to remove inactive durable subscriber", e); 105 } 106 } 107 } 108 } 109 } 110 111 @Override 112 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 113 if (info.isDurable()) { 114 if (broker.getBrokerService().isRejectDurableConsumers()) { 115 throw new JMSException("Durable Consumers are not allowed"); 116 } 117 ActiveMQDestination destination = info.getDestination(); 118 if (!destination.isPattern()) { 119 // Make sure the destination is created. 120 lookup(context, destination,true); 121 } 122 String clientId = context.getClientId(); 123 String subscriptionName = info.getSubscriptionName(); 124 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 125 DurableTopicSubscription sub = durableSubscriptions.get(key); 126 if (sub != null) { 127 // throw this exception only if link stealing is off 128 if (!context.isAllowLinkStealing() && sub.isActive()) { 129 throw new JMSException("Durable consumer is in use for client: " + clientId + 130 " and subscriptionName: " + subscriptionName); 131 } 132 // Has the selector changed?? 133 if (hasDurableSubChanged(info, sub.getConsumerInfo())) { 134 // Remove the consumer first then add it. 135 durableSubscriptions.remove(key); 136 destinationsLock.readLock().lock(); 137 try { 138 for (Destination dest : destinations.values()) { 139 //Account for virtual destinations 140 if (dest instanceof Topic){ 141 Topic topic = (Topic)dest; 142 topic.deleteSubscription(context, key); 143 } 144 } 145 } finally { 146 destinationsLock.readLock().unlock(); 147 } 148 super.removeConsumer(context, sub.getConsumerInfo()); 149 super.addConsumer(context, info); 150 sub = durableSubscriptions.get(key); 151 } else { 152 // Change the consumer id key of the durable sub. 153 if (sub.getConsumerInfo().getConsumerId() != null) { 154 subscriptions.remove(sub.getConsumerInfo().getConsumerId()); 155 } 156 // set the info and context to the new ones. 157 // this is set in the activate() call below, but 158 // that call is a NOP if it is already active. 159 // hence need to set here and deactivate it first 160 if ((sub.context != context) || (sub.info != info)) { 161 sub.info = info; 162 sub.context = context; 163 sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId()); 164 } 165 subscriptions.put(info.getConsumerId(), sub); 166 } 167 } else { 168 super.addConsumer(context, info); 169 sub = durableSubscriptions.get(key); 170 if (sub == null) { 171 throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + 172 " for two different durable subscriptions clientID: " + key.getClientId() + 173 " subscriberName: " + key.getSubscriptionName()); 174 } 175 } 176 sub.activate(usageManager, context, info, broker); 177 return sub; 178 } else { 179 return super.addConsumer(context, info); 180 } 181 } 182 183 @Override 184 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 185 if (info.isDurable()) { 186 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 187 DurableTopicSubscription sub = durableSubscriptions.get(key); 188 if (sub != null) { 189 // deactivate only if given context is same 190 // as what is in the sub. otherwise, during linksteal 191 // sub will get new context, but will be removed here 192 if (sub.getContext() == context) 193 sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId()); 194 } 195 } else { 196 super.removeConsumer(context, info); 197 } 198 } 199 200 @Override 201 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 202 SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName()); 203 DurableTopicSubscription sub = durableSubscriptions.get(key); 204 if (sub == null) { 205 throw new InvalidDestinationException("No durable subscription exists for clientID: " + 206 info.getClientId() + " and subscriptionName: " + 207 info.getSubscriptionName()); 208 } 209 if (sub.isActive()) { 210 throw new JMSException("Durable consumer is in use"); 211 } else { 212 durableSubscriptions.remove(key); 213 } 214 215 destinationsLock.readLock().lock(); 216 try { 217 for (Destination dest : destinations.values()) { 218 if (dest instanceof Topic){ 219 Topic topic = (Topic)dest; 220 topic.deleteSubscription(context, key); 221 } else if (dest instanceof DestinationFilter) { 222 DestinationFilter filter = (DestinationFilter) dest; 223 filter.deleteSubscription(context, key); 224 } 225 } 226 } finally { 227 destinationsLock.readLock().unlock(); 228 } 229 230 if (subscriptions.get(sub.getConsumerInfo().getConsumerId()) != null) { 231 super.removeConsumer(context, sub.getConsumerInfo()); 232 } else { 233 // try destroying inactive subscriptions 234 destroySubscription(sub); 235 } 236 } 237 238 @Override 239 public String toString() { 240 return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%"; 241 } 242 243 @Override 244 protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception { 245 List<Subscription> rc = super.addSubscriptionsForDestination(context, dest); 246 Set<Subscription> dupChecker = new HashSet<Subscription>(rc); 247 248 TopicMessageStore store = (TopicMessageStore)dest.getMessageStore(); 249 // Eagerly recover the durable subscriptions 250 if (store != null) { 251 SubscriptionInfo[] infos = store.getAllSubscriptions(); 252 for (int i = 0; i < infos.length; i++) { 253 254 SubscriptionInfo info = infos[i]; 255 LOG.debug("Restoring durable subscription: {}", info); 256 SubscriptionKey key = new SubscriptionKey(info); 257 258 // A single durable sub may be subscribing to multiple topics. 259 // so it might exist already. 260 DurableTopicSubscription sub = durableSubscriptions.get(key); 261 ConsumerInfo consumerInfo = createInactiveConsumerInfo(info); 262 if (sub == null) { 263 ConnectionContext c = new ConnectionContext(); 264 c.setBroker(context.getBroker()); 265 c.setClientId(key.getClientId()); 266 c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId()); 267 sub = (DurableTopicSubscription)createSubscription(c, consumerInfo); 268 sub.setOfflineTimestamp(System.currentTimeMillis()); 269 } 270 271 if (dupChecker.contains(sub)) { 272 continue; 273 } 274 275 dupChecker.add(sub); 276 rc.add(sub); 277 dest.addSubscription(context, sub); 278 } 279 280 // Now perhaps there other durable subscriptions (via wild card) 281 // that would match this destination.. 282 durableSubscriptions.values(); 283 for (DurableTopicSubscription sub : durableSubscriptions.values()) { 284 // Skip over subscriptions that we already added.. 285 if (dupChecker.contains(sub)) { 286 continue; 287 } 288 289 if (sub.matches(dest.getActiveMQDestination())) { 290 rc.add(sub); 291 dest.addSubscription(context, sub); 292 } 293 } 294 } 295 return rc; 296 } 297 298 public ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) { 299 ConsumerInfo rc = new ConsumerInfo(); 300 rc.setSelector(info.getSelector()); 301 rc.setSubscriptionName(info.getSubscriptionName()); 302 rc.setDestination(info.getSubscribedDestination()); 303 rc.setConsumerId(createConsumerId()); 304 return rc; 305 } 306 307 private ConsumerId createConsumerId() { 308 return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId()); 309 } 310 311 protected void configureTopic(Topic topic, ActiveMQDestination destination) { 312 if (broker.getDestinationPolicy() != null) { 313 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 314 if (entry != null) { 315 entry.configure(broker,topic); 316 } 317 } 318 } 319 320 @Override 321 protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { 322 ActiveMQDestination destination = info.getDestination(); 323 324 if (info.isDurable()) { 325 if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 326 throw new JMSException("Cannot create a durable subscription for an advisory Topic"); 327 } 328 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 329 DurableTopicSubscription sub = durableSubscriptions.get(key); 330 331 if (sub == null) { 332 333 sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive); 334 335 if (destination != null && broker.getDestinationPolicy() != null) { 336 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 337 if (entry != null) { 338 entry.configure(broker, usageManager, sub); 339 } 340 } 341 durableSubscriptions.put(key, sub); 342 } else { 343 throw new JMSException("Durable subscription is already active for clientID: " + 344 context.getClientId() + " and subscriptionName: " + 345 info.getSubscriptionName()); 346 } 347 return sub; 348 } 349 try { 350 TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager); 351 // lets configure the subscription depending on the destination 352 if (destination != null && broker.getDestinationPolicy() != null) { 353 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 354 if (entry != null) { 355 entry.configure(broker, usageManager, answer); 356 } 357 } 358 answer.init(); 359 return answer; 360 } catch (Exception e) { 361 LOG.error("Failed to create TopicSubscription ", e); 362 JMSException jmsEx = new JMSException("Couldn't create TopicSubscription"); 363 jmsEx.setLinkedException(e); 364 throw jmsEx; 365 } 366 } 367 368 private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) { 369 if (info1.getSelector() != null ^ info2.getSelector() != null) { 370 return true; 371 } 372 if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) { 373 return true; 374 } 375 return !info1.getDestination().equals(info2.getDestination()); 376 } 377 378 @Override 379 protected Set<ActiveMQDestination> getInactiveDestinations() { 380 Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations(); 381 for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) { 382 ActiveMQDestination dest = iter.next(); 383 if (!dest.isTopic()) { 384 iter.remove(); 385 } 386 } 387 return inactiveDestinations; 388 } 389 390 public DurableTopicSubscription lookupSubscription(String subscriptionName, String clientId) { 391 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 392 if (durableSubscriptions.containsKey(key)) { 393 return durableSubscriptions.get(key); 394 } 395 396 return null; 397 } 398 399 public List<DurableTopicSubscription> lookupSubscriptions(String clientId) { 400 List<DurableTopicSubscription> result = new ArrayList<DurableTopicSubscription>(); 401 402 for (Map.Entry<SubscriptionKey, DurableTopicSubscription> subscriptionEntry : durableSubscriptions.entrySet()) { 403 if (subscriptionEntry.getKey().getClientId().equals(clientId)) { 404 result.add(subscriptionEntry.getValue()); 405 } 406 } 407 408 return result; 409 } 410 411 public boolean isKeepDurableSubsActive() { 412 return keepDurableSubsActive; 413 } 414 415 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 416 this.keepDurableSubsActive = keepDurableSubsActive; 417 } 418 419 public boolean durableSubscriptionExists(SubscriptionKey key) { 420 return this.durableSubscriptions.containsKey(key); 421 } 422 423 public DurableTopicSubscription getDurableSubscription(SubscriptionKey key) { 424 return durableSubscriptions.get(key); 425 } 426 427 public Map<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions() { 428 return durableSubscriptions; 429 } 430}