001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.advisory; 018 019import java.util.ArrayList; 020import java.util.Arrays; 021import java.util.Collection; 022import java.util.Collections; 023import java.util.Iterator; 024import java.util.LinkedHashMap; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030 031import org.apache.activemq.broker.Broker; 032import org.apache.activemq.broker.BrokerFilter; 033import org.apache.activemq.broker.BrokerService; 034import org.apache.activemq.broker.ConnectionContext; 035import org.apache.activemq.broker.ProducerBrokerExchange; 036import org.apache.activemq.broker.region.BaseDestination; 037import org.apache.activemq.broker.region.Destination; 038import org.apache.activemq.broker.region.DurableTopicSubscription; 039import org.apache.activemq.broker.region.MessageReference; 040import org.apache.activemq.broker.region.RegionBroker; 041import org.apache.activemq.broker.region.Subscription; 042import org.apache.activemq.broker.region.TopicRegion; 043import org.apache.activemq.broker.region.TopicSubscription; 044import org.apache.activemq.broker.region.virtual.VirtualDestination; 045import org.apache.activemq.broker.region.virtual.VirtualTopic; 046import org.apache.activemq.command.ActiveMQDestination; 047import org.apache.activemq.command.ActiveMQMessage; 048import org.apache.activemq.command.ActiveMQTopic; 049import org.apache.activemq.command.BrokerInfo; 050import org.apache.activemq.command.Command; 051import org.apache.activemq.command.ConnectionId; 052import org.apache.activemq.command.ConnectionInfo; 053import org.apache.activemq.command.ConsumerId; 054import org.apache.activemq.command.ConsumerInfo; 055import org.apache.activemq.command.DestinationInfo; 056import org.apache.activemq.command.Message; 057import org.apache.activemq.command.MessageId; 058import org.apache.activemq.command.ProducerId; 059import org.apache.activemq.command.ProducerInfo; 060import org.apache.activemq.command.RemoveSubscriptionInfo; 061import org.apache.activemq.command.SessionId; 062import org.apache.activemq.filter.DestinationPath; 063import org.apache.activemq.security.SecurityContext; 064import org.apache.activemq.state.ProducerState; 065import org.apache.activemq.usage.Usage; 066import org.apache.activemq.util.IdGenerator; 067import org.apache.activemq.util.LongSequenceGenerator; 068import org.apache.activemq.util.SubscriptionKey; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072/** 073 * This broker filter handles tracking the state of the broker for purposes of 074 * publishing advisory messages to advisory consumers. 075 */ 076public class AdvisoryBroker extends BrokerFilter { 077 078 private static final Logger LOG = LoggerFactory.getLogger(AdvisoryBroker.class); 079 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 080 081 protected final ConcurrentMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>(); 082 083 private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock(); 084 protected final Map<ConsumerId, ConsumerInfo> consumers = new LinkedHashMap<ConsumerId, ConsumerInfo>(); 085 086 /** 087 * This is a set to track all of the virtual destinations that have been added to the broker so 088 * they can be easily referenced later. 089 */ 090 protected final Set<VirtualDestination> virtualDestinations = Collections.newSetFromMap(new ConcurrentHashMap<VirtualDestination, Boolean>()); 091 /** 092 * This is a map to track all consumers that exist on the virtual destination so that we can fire 093 * an advisory later when they go away to remove the demand. 094 */ 095 protected final ConcurrentMap<ConsumerInfo, VirtualDestination> virtualDestinationConsumers = new ConcurrentHashMap<>(); 096 /** 097 * This is a map to track unique demand for the existence of a virtual destination so we make sure 098 * we don't send duplicate advisories. 099 */ 100 protected final ConcurrentMap<VirtualConsumerPair, ConsumerInfo> brokerConsumerDests = new ConcurrentHashMap<>(); 101 102 protected final ConcurrentMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>(); 103 protected final ConcurrentMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>(); 104 protected final ConcurrentMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>(); 105 protected final ProducerId advisoryProducerId = new ProducerId(); 106 107 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 108 109 private VirtualDestinationMatcher virtualDestinationMatcher = new DestinationFilterVirtualDestinationMatcher(); 110 111 public AdvisoryBroker(Broker next) { 112 super(next); 113 advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); 114 } 115 116 @Override 117 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 118 super.addConnection(context, info); 119 120 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 121 // do not distribute passwords in advisory messages. usernames okay 122 ConnectionInfo copy = info.copy(); 123 copy.setPassword(""); 124 fireAdvisory(context, topic, copy); 125 connections.put(copy.getConnectionId(), copy); 126 } 127 128 @Override 129 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 130 Subscription answer = super.addConsumer(context, info); 131 132 // Don't advise advisory topics. 133 if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 134 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); 135 consumersLock.writeLock().lock(); 136 try { 137 consumers.put(info.getConsumerId(), info); 138 139 //check if this is a consumer on a destination that matches a virtual destination 140 if (getBrokerService().isUseVirtualDestSubs()) { 141 for (VirtualDestination virtualDestination : virtualDestinations) { 142 if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) { 143 fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination); 144 } 145 } 146 } 147 } finally { 148 consumersLock.writeLock().unlock(); 149 } 150 fireConsumerAdvisory(context, info.getDestination(), topic, info); 151 } else { 152 // We need to replay all the previously collected state objects 153 // for this newly added consumer. 154 if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) { 155 // Replay the connections. 156 for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext(); ) { 157 ConnectionInfo value = iter.next(); 158 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 159 fireAdvisory(context, topic, value, info.getConsumerId()); 160 } 161 } 162 163 // We check here whether the Destination is Temporary Destination specific or not since we 164 // can avoid sending advisory messages to the consumer if it only wants Temporary Destination 165 // notifications. If its not just temporary destination related destinations then we have 166 // to send them all, a composite destination could want both. 167 if (AdvisorySupport.isTempDestinationAdvisoryTopic(info.getDestination())) { 168 // Replay the temporary destinations. 169 for (DestinationInfo destination : destinations.values()) { 170 if (destination.getDestination().isTemporary()) { 171 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination()); 172 fireAdvisory(context, topic, destination, info.getConsumerId()); 173 } 174 } 175 } else if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) { 176 // Replay all the destinations. 177 for (DestinationInfo destination : destinations.values()) { 178 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination()); 179 fireAdvisory(context, topic, destination, info.getConsumerId()); 180 } 181 } 182 183 // Replay the producers. 184 if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) { 185 for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext(); ) { 186 ProducerInfo value = iter.next(); 187 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination()); 188 fireProducerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId()); 189 } 190 } 191 192 // Replay the consumers. 193 if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) { 194 consumersLock.readLock().lock(); 195 try { 196 for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) { 197 ConsumerInfo value = iter.next(); 198 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination()); 199 fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId()); 200 } 201 } finally { 202 consumersLock.readLock().unlock(); 203 } 204 } 205 206 // Replay the virtual destination consumers. 207 if (AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) { 208 for (Iterator<ConsumerInfo> iter = virtualDestinationConsumers.keySet().iterator(); iter.hasNext(); ) { 209 ConsumerInfo key = iter.next(); 210 ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(key.getDestination()); 211 fireConsumerAdvisory(context, key.getDestination(), topic, key); 212 } 213 } 214 215 // Replay network bridges 216 if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) { 217 for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext(); ) { 218 BrokerInfo key = iter.next(); 219 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); 220 fireAdvisory(context, topic, key, null, networkBridges.get(key)); 221 } 222 } 223 } 224 return answer; 225 } 226 227 @Override 228 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 229 super.addProducer(context, info); 230 231 // Don't advise advisory topics. 232 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 233 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); 234 fireProducerAdvisory(context, info.getDestination(), topic, info); 235 producers.put(info.getProducerId(), info); 236 } 237 } 238 239 @Override 240 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception { 241 Destination answer = super.addDestination(context, destination, create); 242 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 243 //for queues, create demand if isUseVirtualDestSubsOnCreation is true 244 if (getBrokerService().isUseVirtualDestSubsOnCreation() && destination.isQueue()) { 245 //check if this new destination matches a virtual destination that exists 246 for (VirtualDestination virtualDestination : virtualDestinations) { 247 if (virtualDestinationMatcher.matches(virtualDestination, destination)) { 248 fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination); 249 } 250 } 251 } 252 253 DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); 254 DestinationInfo previous = destinations.putIfAbsent(destination, info); 255 if (previous == null) { 256 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 257 fireAdvisory(context, topic, info); 258 } 259 } 260 return answer; 261 } 262 263 @Override 264 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 265 ActiveMQDestination destination = info.getDestination(); 266 next.addDestinationInfo(context, info); 267 268 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 269 DestinationInfo previous = destinations.putIfAbsent(destination, info); 270 if (previous == null) { 271 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 272 fireAdvisory(context, topic, info); 273 } 274 } 275 } 276 277 @Override 278 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 279 super.removeDestination(context, destination, timeout); 280 DestinationInfo info = destinations.remove(destination); 281 if (info != null) { 282 283 //on destination removal, remove all demand if using virtual dest subs 284 if (getBrokerService().isUseVirtualDestSubs()) { 285 for (ConsumerInfo consumerInfo : virtualDestinationConsumers.keySet()) { 286 //find all consumers for this virtual destination 287 VirtualDestination virtualDestination = virtualDestinationConsumers.get(consumerInfo); 288 289 //find a consumer that matches this virtualDest and destination 290 if (virtualDestinationMatcher.matches(virtualDestination, destination)) { 291 //in case of multiple matches 292 VirtualConsumerPair key = new VirtualConsumerPair(virtualDestination, destination); 293 ConsumerInfo i = brokerConsumerDests.get(key); 294 if (consumerInfo.equals(i)) { 295 if (brokerConsumerDests.remove(key) != null) { 296 fireVirtualDestinationRemoveAdvisory(context, consumerInfo); 297 break; 298 } 299 } 300 } 301 } 302 } 303 304 // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate 305 info = info.copy(); 306 info.setDestination(destination); 307 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 308 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 309 fireAdvisory(context, topic, info); 310 ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destination); 311 for (ActiveMQTopic advisoryDestination : advisoryDestinations) { 312 try { 313 next.removeDestination(context, advisoryDestination, -1); 314 } catch (Exception expectedIfDestinationDidNotExistYet) { 315 } 316 } 317 } 318 } 319 320 @Override 321 public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception { 322 super.removeDestinationInfo(context, destInfo); 323 DestinationInfo info = destinations.remove(destInfo.getDestination()); 324 if (info != null) { 325 // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate 326 info = info.copy(); 327 info.setDestination(destInfo.getDestination()); 328 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 329 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination()); 330 fireAdvisory(context, topic, info); 331 ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destInfo.getDestination()); 332 for (ActiveMQTopic advisoryDestination : advisoryDestinations) { 333 try { 334 next.removeDestination(context, advisoryDestination, -1); 335 } catch (Exception expectedIfDestinationDidNotExistYet) { 336 } 337 } 338 } 339 } 340 341 @Override 342 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 343 super.removeConnection(context, info, error); 344 345 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 346 fireAdvisory(context, topic, info.createRemoveCommand()); 347 connections.remove(info.getConnectionId()); 348 } 349 350 @Override 351 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 352 super.removeConsumer(context, info); 353 354 // Don't advise advisory topics. 355 ActiveMQDestination dest = info.getDestination(); 356 if (!AdvisorySupport.isAdvisoryTopic(dest)) { 357 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); 358 consumersLock.writeLock().lock(); 359 try { 360 consumers.remove(info.getConsumerId()); 361 362 //remove the demand for this consumer if it matches a virtual destination 363 if(getBrokerService().isUseVirtualDestSubs()) { 364 fireVirtualDestinationRemoveAdvisory(context, info); 365 } 366 } finally { 367 consumersLock.writeLock().unlock(); 368 } 369 if (!dest.isTemporary() || destinations.containsKey(dest)) { 370 fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand()); 371 } 372 } 373 } 374 375 @Override 376 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 377 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 378 379 RegionBroker regionBroker = null; 380 if (next instanceof RegionBroker) { 381 regionBroker = (RegionBroker) next; 382 } else { 383 BrokerService service = next.getBrokerService(); 384 regionBroker = (RegionBroker) service.getRegionBroker(); 385 } 386 387 if (regionBroker == null) { 388 LOG.warn("Cannot locate a RegionBroker instance to pass along the removeSubscription call"); 389 throw new IllegalStateException("No RegionBroker found."); 390 } 391 392 DurableTopicSubscription sub = ((TopicRegion) regionBroker.getTopicRegion()).getDurableSubscription(key); 393 394 super.removeSubscription(context, info); 395 396 if (sub == null) { 397 LOG.warn("We cannot send an advisory message for a durable sub removal when we don't know about the durable sub"); 398 return; 399 } 400 401 ActiveMQDestination dest = sub.getConsumerInfo().getDestination(); 402 403 // Don't advise advisory topics. 404 if (!AdvisorySupport.isAdvisoryTopic(dest)) { 405 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); 406 fireConsumerAdvisory(context, dest, topic, info); 407 } 408 409 } 410 411 @Override 412 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 413 super.removeProducer(context, info); 414 415 // Don't advise advisory topics. 416 ActiveMQDestination dest = info.getDestination(); 417 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) { 418 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest); 419 producers.remove(info.getProducerId()); 420 if (!dest.isTemporary() || destinations.containsKey(dest)) { 421 fireProducerAdvisory(context, dest, topic, info.createRemoveCommand()); 422 } 423 } 424 } 425 426 @Override 427 public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription) { 428 super.messageExpired(context, messageReference, subscription); 429 try { 430 if (!messageReference.isAdvisory()) { 431 BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); 432 ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(baseDestination.getActiveMQDestination()); 433 Message payload = messageReference.getMessage().copy(); 434 payload.clearBody(); 435 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 436 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); 437 fireAdvisory(context, topic, payload, null, advisoryMessage); 438 } 439 } catch (Exception e) { 440 handleFireFailure("expired", e); 441 } 442 } 443 444 @Override 445 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 446 super.messageConsumed(context, messageReference); 447 try { 448 if (!messageReference.isAdvisory()) { 449 BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); 450 ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(baseDestination.getActiveMQDestination()); 451 Message payload = messageReference.getMessage().copy(); 452 payload.clearBody(); 453 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 454 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); 455 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName()); 456 fireAdvisory(context, topic, payload, null, advisoryMessage); 457 } 458 } catch (Exception e) { 459 handleFireFailure("consumed", e); 460 } 461 } 462 463 @Override 464 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 465 super.messageDelivered(context, messageReference); 466 try { 467 if (!messageReference.isAdvisory()) { 468 BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); 469 ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(baseDestination.getActiveMQDestination()); 470 Message payload = messageReference.getMessage().copy(); 471 payload.clearBody(); 472 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 473 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); 474 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName()); 475 fireAdvisory(context, topic, payload, null, advisoryMessage); 476 } 477 } catch (Exception e) { 478 handleFireFailure("delivered", e); 479 } 480 } 481 482 @Override 483 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 484 super.messageDiscarded(context, sub, messageReference); 485 try { 486 if (!messageReference.isAdvisory()) { 487 BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); 488 ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(baseDestination.getActiveMQDestination()); 489 Message payload = messageReference.getMessage().copy(); 490 payload.clearBody(); 491 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 492 if (sub instanceof TopicSubscription) { 493 advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded()); 494 } 495 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); 496 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString()); 497 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName()); 498 499 fireAdvisory(context, topic, payload, null, advisoryMessage); 500 } 501 } catch (Exception e) { 502 handleFireFailure("discarded", e); 503 } 504 } 505 506 @Override 507 public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) { 508 super.slowConsumer(context, destination, subs); 509 try { 510 if (!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) { 511 ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination()); 512 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 513 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString()); 514 fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage); 515 } 516 } catch (Exception e) { 517 handleFireFailure("slow consumer", e); 518 } 519 } 520 521 @Override 522 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo, ActiveMQDestination destination) { 523 super.fastProducer(context, producerInfo, destination); 524 try { 525 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 526 ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(destination); 527 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 528 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString()); 529 fireAdvisory(context, topic, producerInfo, null, advisoryMessage); 530 } 531 } catch (Exception e) { 532 handleFireFailure("fast producer", e); 533 } 534 } 535 536 private final IdGenerator connectionIdGenerator = new IdGenerator("advisory"); 537 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); 538 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 539 540 @Override 541 public void virtualDestinationAdded(ConnectionContext context, 542 VirtualDestination virtualDestination) { 543 super.virtualDestinationAdded(context, virtualDestination); 544 545 if (virtualDestinations.add(virtualDestination)) { 546 try { 547 // Don't advise advisory topics. 548 if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) { 549 550 //create demand for consumers on virtual destinations 551 consumersLock.readLock().lock(); 552 try { 553 //loop through existing destinations to see if any match this newly 554 //created virtual destination 555 if (getBrokerService().isUseVirtualDestSubsOnCreation()) { 556 //for matches that are a queue, fire an advisory for demand 557 for (ActiveMQDestination destination : destinations.keySet()) { 558 if(destination.isQueue()) { 559 if (virtualDestinationMatcher.matches(virtualDestination, destination)) { 560 fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination); 561 } 562 } 563 } 564 } 565 566 //loop through existing consumers to see if any of them are consuming on a destination 567 //that matches the new virtual destination 568 for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) { 569 ConsumerInfo info = iter.next(); 570 if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) { 571 fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination); 572 } 573 } 574 } finally { 575 consumersLock.readLock().unlock(); 576 } 577 } 578 } catch (Exception e) { 579 handleFireFailure("virtualDestinationAdded", e); 580 } 581 } 582 } 583 584 private void fireVirtualDestinationAddAdvisory(ConnectionContext context, ConsumerInfo info, ActiveMQDestination activeMQDest, 585 VirtualDestination virtualDestination) throws Exception { 586 //if no consumer info, we need to create one - this is the case when an advisory is fired 587 //because of the existence of a destination matching a virtual destination 588 if (info == null) { 589 //store the virtual destination and the activeMQDestination as a pair so that we can keep track 590 //of all matching forwarded destinations that caused demand 591 VirtualConsumerPair pair = new VirtualConsumerPair(virtualDestination, activeMQDest); 592 if (brokerConsumerDests.get(pair) == null) { 593 ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId()); 594 SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId()); 595 ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); 596 info = new ConsumerInfo(consumerId); 597 598 if(brokerConsumerDests.putIfAbsent(pair, info) == null) { 599 LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info); 600 setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest); 601 ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); 602 603 if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { 604 LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination); 605 fireConsumerAdvisory(context, info.getDestination(), topic, info); 606 } 607 } 608 } 609 //this is the case of a real consumer coming online 610 } else { 611 info = info.copy(); 612 setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest); 613 ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); 614 615 if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { 616 fireConsumerAdvisory(context, info.getDestination(), topic, info); 617 } 618 } 619 } 620 621 /** 622 * Sets the virtual destination on the ConsumerInfo 623 * If this is a VirtualTopic then the destination used will be the actual topic subscribed 624 * to in order to track demand properly 625 * 626 * @param info 627 * @param virtualDestination 628 * @param activeMQDest 629 */ 630 private void setConsumerInfoVirtualDest(ConsumerInfo info, VirtualDestination virtualDestination, ActiveMQDestination activeMQDest) { 631 info.setDestination(virtualDestination.getVirtualDestination()); 632 if (virtualDestination instanceof VirtualTopic) { 633 VirtualTopic vt = (VirtualTopic) virtualDestination; 634 String prefix = vt.getPrefix() != null ? vt.getPrefix() : ""; 635 String postfix = vt.getPostfix() != null ? vt.getPostfix() : ""; 636 if (prefix.endsWith(".")) { 637 prefix = prefix.substring(0, prefix.length() - 1); 638 } 639 if (postfix.startsWith(".")) { 640 postfix = postfix.substring(1, postfix.length()); 641 } 642 ActiveMQDestination prefixDestination = prefix.length() > 0 ? new ActiveMQTopic(prefix) : null; 643 ActiveMQDestination postfixDestination = postfix.length() > 0 ? new ActiveMQTopic(postfix) : null; 644 645 String[] prefixPaths = prefixDestination != null ? prefixDestination.getDestinationPaths() : new String[] {}; 646 String[] activeMQDestPaths = activeMQDest.getDestinationPaths(); 647 String[] postfixPaths = postfixDestination != null ? postfixDestination.getDestinationPaths() : new String[] {}; 648 649 //sanity check 650 if (activeMQDestPaths.length > prefixPaths.length + postfixPaths.length) { 651 String[] topicPath = Arrays.copyOfRange(activeMQDestPaths, 0 + prefixPaths.length, 652 activeMQDestPaths.length - postfixPaths.length); 653 654 ActiveMQTopic newTopic = new ActiveMQTopic(DestinationPath.toString(topicPath)); 655 info.setDestination(newTopic); 656 } 657 } 658 } 659 660 @Override 661 public void virtualDestinationRemoved(ConnectionContext context, 662 VirtualDestination virtualDestination) { 663 super.virtualDestinationRemoved(context, virtualDestination); 664 665 if (virtualDestinations.remove(virtualDestination)) { 666 try { 667 consumersLock.readLock().lock(); 668 try { 669 // remove the demand created by the addition of the virtual destination 670 if (getBrokerService().isUseVirtualDestSubsOnCreation()) { 671 if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) { 672 for (ConsumerInfo info : virtualDestinationConsumers.keySet()) { 673 //find all consumers for this virtual destination 674 if (virtualDestinationConsumers.get(info).equals(virtualDestination)) { 675 fireVirtualDestinationRemoveAdvisory(context, info); 676 } 677 678 //check consumers created for the existence of a destination to see if they 679 //match the consumerinfo and clean up 680 for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) { 681 ConsumerInfo i = brokerConsumerDests.get(activeMQDest); 682 if (info.equals(i)) { 683 brokerConsumerDests.remove(activeMQDest); 684 } 685 } 686 } 687 } 688 } 689 } finally { 690 consumersLock.readLock().unlock(); 691 } 692 } catch (Exception e) { 693 handleFireFailure("virtualDestinationAdded", e); 694 } 695 } 696 } 697 698 private void fireVirtualDestinationRemoveAdvisory(ConnectionContext context, 699 ConsumerInfo info) throws Exception { 700 701 VirtualDestination virtualDestination = virtualDestinationConsumers.remove(info); 702 if (virtualDestination != null) { 703 ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(virtualDestination.getVirtualDestination()); 704 705 ActiveMQDestination dest = info.getDestination(); 706 707 if (!dest.isTemporary() || destinations.containsKey(dest)) { 708 fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand()); 709 } 710 } 711 } 712 713 @Override 714 public void isFull(ConnectionContext context, Destination destination, Usage usage) { 715 super.isFull(context, destination, usage); 716 if (AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()) == false) { 717 try { 718 719 ActiveMQTopic topic = AdvisorySupport.getFullAdvisoryTopic(destination.getActiveMQDestination()); 720 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 721 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_USAGE_NAME, usage.getName()); 722 advisoryMessage.setLongProperty(AdvisorySupport.MSG_PROPERTY_USAGE_COUNT, usage.getUsage()); 723 fireAdvisory(context, topic, null, null, advisoryMessage); 724 725 } catch (Exception e) { 726 handleFireFailure("is full", e); 727 } 728 } 729 } 730 731 @Override 732 public void nowMasterBroker() { 733 super.nowMasterBroker(); 734 try { 735 ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic(); 736 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 737 ConnectionContext context = new ConnectionContext(); 738 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 739 context.setBroker(getBrokerService().getBroker()); 740 fireAdvisory(context, topic, null, null, advisoryMessage); 741 } catch (Exception e) { 742 handleFireFailure("now master broker", e); 743 } 744 } 745 746 @Override 747 public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, 748 Subscription subscription, Throwable poisonCause) { 749 boolean wasDLQd = super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause); 750 if (wasDLQd) { 751 try { 752 if (!messageReference.isAdvisory()) { 753 BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); 754 ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(baseDestination.getActiveMQDestination()); 755 Message payload = messageReference.getMessage().copy(); 756 payload.clearBody(); 757 fireAdvisory(context, topic, payload); 758 } 759 } catch (Exception e) { 760 handleFireFailure("add to DLQ", e); 761 } 762 } 763 764 return wasDLQd; 765 } 766 767 @Override 768 public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) { 769 try { 770 if (brokerInfo != null) { 771 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 772 advisoryMessage.setBooleanProperty("started", true); 773 advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex); 774 advisoryMessage.setStringProperty("remoteIp", remoteIp); 775 networkBridges.putIfAbsent(brokerInfo, advisoryMessage); 776 777 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); 778 779 ConnectionContext context = new ConnectionContext(); 780 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 781 context.setBroker(getBrokerService().getBroker()); 782 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); 783 } 784 } catch (Exception e) { 785 handleFireFailure("network bridge started", e); 786 } 787 } 788 789 @Override 790 public void networkBridgeStopped(BrokerInfo brokerInfo) { 791 try { 792 if (brokerInfo != null) { 793 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 794 advisoryMessage.setBooleanProperty("started", false); 795 networkBridges.remove(brokerInfo); 796 797 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); 798 799 ConnectionContext context = new ConnectionContext(); 800 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 801 context.setBroker(getBrokerService().getBroker()); 802 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); 803 } 804 } catch (Exception e) { 805 handleFireFailure("network bridge stopped", e); 806 } 807 } 808 809 private void handleFireFailure(String message, Throwable cause) { 810 LOG.warn("Failed to fire {} advisory, reason: {}", message, cause); 811 LOG.debug("{} detail: {}", message, cause, cause); 812 } 813 814 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { 815 fireAdvisory(context, topic, command, null); 816 } 817 818 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 819 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 820 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 821 } 822 823 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command) throws Exception { 824 fireConsumerAdvisory(context, consumerDestination, topic, command, null); 825 } 826 827 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 828 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 829 int count = 0; 830 Set<Destination> set = getDestinations(consumerDestination); 831 if (set != null) { 832 for (Destination dest : set) { 833 count += dest.getDestinationStatistics().getConsumers().getCount(); 834 } 835 } 836 advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count); 837 838 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 839 } 840 841 protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception { 842 fireProducerAdvisory(context, producerDestination, topic, command, null); 843 } 844 845 protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 846 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 847 int count = 0; 848 if (producerDestination != null) { 849 Set<Destination> set = getDestinations(producerDestination); 850 if (set != null) { 851 for (Destination dest : set) { 852 count += dest.getDestinationStatistics().getProducers().getCount(); 853 } 854 } 855 } 856 advisoryMessage.setIntProperty("producerCount", count); 857 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 858 } 859 860 public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception { 861 //set properties 862 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); 863 String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET"; 864 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); 865 866 String url = getBrokerService().getVmConnectorURI().toString(); 867 if (getBrokerService().getDefaultSocketURIString() != null) { 868 url = getBrokerService().getDefaultSocketURIString(); 869 } 870 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); 871 872 //set the data structure 873 advisoryMessage.setDataStructure(command); 874 advisoryMessage.setPersistent(false); 875 advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 876 advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId())); 877 advisoryMessage.setTargetConsumerId(targetConsumerId); 878 advisoryMessage.setDestination(topic); 879 advisoryMessage.setResponseRequired(false); 880 advisoryMessage.setProducerId(advisoryProducerId); 881 boolean originalFlowControl = context.isProducerFlowControl(); 882 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 883 producerExchange.setConnectionContext(context); 884 producerExchange.setMutable(true); 885 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 886 try { 887 context.setProducerFlowControl(false); 888 next.send(producerExchange, advisoryMessage); 889 } finally { 890 context.setProducerFlowControl(originalFlowControl); 891 } 892 } 893 894 public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() { 895 return connections; 896 } 897 898 public Collection<ConsumerInfo> getAdvisoryConsumers() { 899 consumersLock.readLock().lock(); 900 try { 901 return new ArrayList<ConsumerInfo>(consumers.values()); 902 } finally { 903 consumersLock.readLock().unlock(); 904 } 905 } 906 907 public Map<ProducerId, ProducerInfo> getAdvisoryProducers() { 908 return producers; 909 } 910 911 public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() { 912 return destinations; 913 } 914 915 private class VirtualConsumerPair { 916 private final VirtualDestination virtualDestination; 917 918 //destination that matches this virtualDestination as part target 919 //this is so we can keep track of more than one destination that might 920 //match the virtualDestination and cause demand 921 private final ActiveMQDestination activeMQDestination; 922 923 public VirtualConsumerPair(VirtualDestination virtualDestination, 924 ActiveMQDestination activeMQDestination) { 925 super(); 926 this.virtualDestination = virtualDestination; 927 this.activeMQDestination = activeMQDestination; 928 } 929 @Override 930 public int hashCode() { 931 final int prime = 31; 932 int result = 1; 933 result = prime * result + getOuterType().hashCode(); 934 result = prime 935 * result 936 + ((activeMQDestination == null) ? 0 : activeMQDestination 937 .hashCode()); 938 result = prime 939 * result 940 + ((virtualDestination == null) ? 0 : virtualDestination 941 .hashCode()); 942 return result; 943 } 944 @Override 945 public boolean equals(Object obj) { 946 if (this == obj) 947 return true; 948 if (obj == null) 949 return false; 950 if (getClass() != obj.getClass()) 951 return false; 952 VirtualConsumerPair other = (VirtualConsumerPair) obj; 953 if (!getOuterType().equals(other.getOuterType())) 954 return false; 955 if (activeMQDestination == null) { 956 if (other.activeMQDestination != null) 957 return false; 958 } else if (!activeMQDestination.equals(other.activeMQDestination)) 959 return false; 960 if (virtualDestination == null) { 961 if (other.virtualDestination != null) 962 return false; 963 } else if (!virtualDestination.equals(other.virtualDestination)) 964 return false; 965 return true; 966 } 967 private AdvisoryBroker getOuterType() { 968 return AdvisoryBroker.this; 969 } 970 } 971}