001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.advisory;
018
019import java.util.ArrayList;
020import java.util.Arrays;
021import java.util.Collection;
022import java.util.Collections;
023import java.util.Iterator;
024import java.util.LinkedHashMap;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031import org.apache.activemq.broker.Broker;
032import org.apache.activemq.broker.BrokerFilter;
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.ConnectionContext;
035import org.apache.activemq.broker.ProducerBrokerExchange;
036import org.apache.activemq.broker.region.BaseDestination;
037import org.apache.activemq.broker.region.Destination;
038import org.apache.activemq.broker.region.DurableTopicSubscription;
039import org.apache.activemq.broker.region.MessageReference;
040import org.apache.activemq.broker.region.RegionBroker;
041import org.apache.activemq.broker.region.Subscription;
042import org.apache.activemq.broker.region.TopicRegion;
043import org.apache.activemq.broker.region.TopicSubscription;
044import org.apache.activemq.broker.region.virtual.VirtualDestination;
045import org.apache.activemq.broker.region.virtual.VirtualTopic;
046import org.apache.activemq.command.ActiveMQDestination;
047import org.apache.activemq.command.ActiveMQMessage;
048import org.apache.activemq.command.ActiveMQTopic;
049import org.apache.activemq.command.BrokerInfo;
050import org.apache.activemq.command.Command;
051import org.apache.activemq.command.ConnectionId;
052import org.apache.activemq.command.ConnectionInfo;
053import org.apache.activemq.command.ConsumerId;
054import org.apache.activemq.command.ConsumerInfo;
055import org.apache.activemq.command.DestinationInfo;
056import org.apache.activemq.command.Message;
057import org.apache.activemq.command.MessageId;
058import org.apache.activemq.command.ProducerId;
059import org.apache.activemq.command.ProducerInfo;
060import org.apache.activemq.command.RemoveSubscriptionInfo;
061import org.apache.activemq.command.SessionId;
062import org.apache.activemq.filter.DestinationPath;
063import org.apache.activemq.security.SecurityContext;
064import org.apache.activemq.state.ProducerState;
065import org.apache.activemq.usage.Usage;
066import org.apache.activemq.util.IdGenerator;
067import org.apache.activemq.util.LongSequenceGenerator;
068import org.apache.activemq.util.SubscriptionKey;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072/**
073 * This broker filter handles tracking the state of the broker for purposes of
074 * publishing advisory messages to advisory consumers.
075 */
076public class AdvisoryBroker extends BrokerFilter {
077
078    private static final Logger LOG = LoggerFactory.getLogger(AdvisoryBroker.class);
079    private static final IdGenerator ID_GENERATOR = new IdGenerator();
080
081    protected final ConcurrentMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
082
083    private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
084    protected final Map<ConsumerId, ConsumerInfo> consumers = new LinkedHashMap<ConsumerId, ConsumerInfo>();
085
086    /**
087     * This is a set to track all of the virtual destinations that have been added to the broker so
088     * they can be easily referenced later.
089     */
090    protected final Set<VirtualDestination> virtualDestinations = Collections.newSetFromMap(new ConcurrentHashMap<VirtualDestination, Boolean>());
091    /**
092     * This is a map to track all consumers that exist on the virtual destination so that we can fire
093     * an advisory later when they go away to remove the demand.
094     */
095    protected final ConcurrentMap<ConsumerInfo, VirtualDestination> virtualDestinationConsumers = new ConcurrentHashMap<>();
096    /**
097     * This is a map to track unique demand for the existence of a virtual destination so we make sure
098     * we don't send duplicate advisories.
099     */
100    protected final ConcurrentMap<VirtualConsumerPair, ConsumerInfo> brokerConsumerDests = new ConcurrentHashMap<>();
101
102    protected final ConcurrentMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
103    protected final ConcurrentMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
104    protected final ConcurrentMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
105    protected final ProducerId advisoryProducerId = new ProducerId();
106
107    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
108
109    private VirtualDestinationMatcher virtualDestinationMatcher = new DestinationFilterVirtualDestinationMatcher();
110
111    public AdvisoryBroker(Broker next) {
112        super(next);
113        advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
114    }
115
116    @Override
117    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
118        super.addConnection(context, info);
119
120        ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
121        // do not distribute passwords in advisory messages. usernames okay
122        ConnectionInfo copy = info.copy();
123        copy.setPassword("");
124        fireAdvisory(context, topic, copy);
125        connections.put(copy.getConnectionId(), copy);
126    }
127
128    @Override
129    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
130        Subscription answer = super.addConsumer(context, info);
131
132        // Don't advise advisory topics.
133        if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
134            ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
135            consumersLock.writeLock().lock();
136            try {
137                consumers.put(info.getConsumerId(), info);
138
139                //check if this is a consumer on a destination that matches a virtual destination
140                if (getBrokerService().isUseVirtualDestSubs()) {
141                    for (VirtualDestination virtualDestination : virtualDestinations) {
142                        if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) {
143                            fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination);
144                        }
145                    }
146                }
147            } finally {
148                consumersLock.writeLock().unlock();
149            }
150            fireConsumerAdvisory(context, info.getDestination(), topic, info);
151        } else {
152            // We need to replay all the previously collected state objects
153            // for this newly added consumer.
154            if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) {
155                // Replay the connections.
156                for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext(); ) {
157                    ConnectionInfo value = iter.next();
158                    ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
159                    fireAdvisory(context, topic, value, info.getConsumerId());
160                }
161            }
162
163            // We check here whether the Destination is Temporary Destination specific or not since we
164            // can avoid sending advisory messages to the consumer if it only wants Temporary Destination
165            // notifications.  If its not just temporary destination related destinations then we have
166            // to send them all, a composite destination could want both.
167            if (AdvisorySupport.isTempDestinationAdvisoryTopic(info.getDestination())) {
168                // Replay the temporary destinations.
169                for (DestinationInfo destination : destinations.values()) {
170                    if (destination.getDestination().isTemporary()) {
171                        ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
172                        fireAdvisory(context, topic, destination, info.getConsumerId());
173                    }
174                }
175            } else if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) {
176                // Replay all the destinations.
177                for (DestinationInfo destination : destinations.values()) {
178                    ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
179                    fireAdvisory(context, topic, destination, info.getConsumerId());
180                }
181            }
182
183            // Replay the producers.
184            if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) {
185                for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext(); ) {
186                    ProducerInfo value = iter.next();
187                    ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
188                    fireProducerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
189                }
190            }
191
192            // Replay the consumers.
193            if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
194                consumersLock.readLock().lock();
195                try {
196                    for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) {
197                        ConsumerInfo value = iter.next();
198                        ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
199                        fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
200                    }
201                } finally {
202                    consumersLock.readLock().unlock();
203                }
204            }
205
206            // Replay the virtual destination consumers.
207            if (AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
208                for (Iterator<ConsumerInfo> iter = virtualDestinationConsumers.keySet().iterator(); iter.hasNext(); ) {
209                    ConsumerInfo key = iter.next();
210                    ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(key.getDestination());
211                    fireConsumerAdvisory(context, key.getDestination(), topic, key);
212              }
213            }
214
215            // Replay network bridges
216            if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) {
217                for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext(); ) {
218                    BrokerInfo key = iter.next();
219                    ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
220                    fireAdvisory(context, topic, key, null, networkBridges.get(key));
221                }
222            }
223        }
224        return answer;
225    }
226
227    @Override
228    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
229        super.addProducer(context, info);
230
231        // Don't advise advisory topics.
232        if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
233            ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
234            fireProducerAdvisory(context, info.getDestination(), topic, info);
235            producers.put(info.getProducerId(), info);
236        }
237    }
238
239    @Override
240    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception {
241        Destination answer = super.addDestination(context, destination, create);
242        if (!AdvisorySupport.isAdvisoryTopic(destination)) {
243            //for queues, create demand if isUseVirtualDestSubsOnCreation is true
244            if (getBrokerService().isUseVirtualDestSubsOnCreation() && destination.isQueue()) {
245                //check if this new destination matches a virtual destination that exists
246                for (VirtualDestination virtualDestination : virtualDestinations) {
247                    if (virtualDestinationMatcher.matches(virtualDestination, destination)) {
248                        fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination);
249                    }
250                }
251            }
252
253            DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
254            DestinationInfo previous = destinations.putIfAbsent(destination, info);
255            if (previous == null) {
256                ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
257                fireAdvisory(context, topic, info);
258            }
259        }
260        return answer;
261    }
262
263    @Override
264    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
265        ActiveMQDestination destination = info.getDestination();
266        next.addDestinationInfo(context, info);
267
268        if (!AdvisorySupport.isAdvisoryTopic(destination)) {
269            DestinationInfo previous = destinations.putIfAbsent(destination, info);
270            if (previous == null) {
271                ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
272                fireAdvisory(context, topic, info);
273            }
274        }
275    }
276
277    @Override
278    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
279        super.removeDestination(context, destination, timeout);
280        DestinationInfo info = destinations.remove(destination);
281        if (info != null) {
282
283            //on destination removal, remove all demand if using virtual dest subs
284            if (getBrokerService().isUseVirtualDestSubs()) {
285                for (ConsumerInfo consumerInfo : virtualDestinationConsumers.keySet()) {
286                    //find all consumers for this virtual destination
287                    VirtualDestination virtualDestination = virtualDestinationConsumers.get(consumerInfo);
288
289                    //find a consumer that matches this virtualDest and destination
290                    if (virtualDestinationMatcher.matches(virtualDestination, destination)) {
291                        //in case of multiple matches
292                        VirtualConsumerPair key = new VirtualConsumerPair(virtualDestination, destination);
293                        ConsumerInfo i = brokerConsumerDests.get(key);
294                        if (consumerInfo.equals(i)) {
295                            if (brokerConsumerDests.remove(key) != null) {
296                                fireVirtualDestinationRemoveAdvisory(context, consumerInfo);
297                                break;
298                            }
299                        }
300                    }
301                }
302            }
303
304            // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate
305            info = info.copy();
306            info.setDestination(destination);
307            info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
308            ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
309            fireAdvisory(context, topic, info);
310            ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destination);
311            for (ActiveMQTopic advisoryDestination : advisoryDestinations) {
312                try {
313                    next.removeDestination(context, advisoryDestination, -1);
314                } catch (Exception expectedIfDestinationDidNotExistYet) {
315                }
316            }
317        }
318    }
319
320    @Override
321    public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception {
322        super.removeDestinationInfo(context, destInfo);
323        DestinationInfo info = destinations.remove(destInfo.getDestination());
324        if (info != null) {
325            // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate
326            info = info.copy();
327            info.setDestination(destInfo.getDestination());
328            info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
329            ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination());
330            fireAdvisory(context, topic, info);
331            ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destInfo.getDestination());
332            for (ActiveMQTopic advisoryDestination : advisoryDestinations) {
333                try {
334                    next.removeDestination(context, advisoryDestination, -1);
335                } catch (Exception expectedIfDestinationDidNotExistYet) {
336                }
337            }
338        }
339    }
340
341    @Override
342    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
343        super.removeConnection(context, info, error);
344
345        ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
346        fireAdvisory(context, topic, info.createRemoveCommand());
347        connections.remove(info.getConnectionId());
348    }
349
350    @Override
351    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
352        super.removeConsumer(context, info);
353
354        // Don't advise advisory topics.
355        ActiveMQDestination dest = info.getDestination();
356        if (!AdvisorySupport.isAdvisoryTopic(dest)) {
357            ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
358            consumersLock.writeLock().lock();
359            try {
360                consumers.remove(info.getConsumerId());
361
362                //remove the demand for this consumer if it matches a virtual destination
363                if(getBrokerService().isUseVirtualDestSubs()) {
364                    fireVirtualDestinationRemoveAdvisory(context, info);
365                }
366            } finally {
367                consumersLock.writeLock().unlock();
368            }
369            if (!dest.isTemporary() || destinations.containsKey(dest)) {
370                fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand());
371            }
372        }
373    }
374
375    @Override
376    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
377        SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
378
379        RegionBroker regionBroker = null;
380        if (next instanceof RegionBroker) {
381            regionBroker = (RegionBroker) next;
382        } else {
383            BrokerService service = next.getBrokerService();
384            regionBroker = (RegionBroker) service.getRegionBroker();
385        }
386
387        if (regionBroker == null) {
388            LOG.warn("Cannot locate a RegionBroker instance to pass along the removeSubscription call");
389            throw new IllegalStateException("No RegionBroker found.");
390        }
391
392        DurableTopicSubscription sub = ((TopicRegion) regionBroker.getTopicRegion()).getDurableSubscription(key);
393
394        super.removeSubscription(context, info);
395
396        if (sub == null) {
397            LOG.warn("We cannot send an advisory message for a durable sub removal when we don't know about the durable sub");
398            return;
399        }
400
401        ActiveMQDestination dest = sub.getConsumerInfo().getDestination();
402
403        // Don't advise advisory topics.
404        if (!AdvisorySupport.isAdvisoryTopic(dest)) {
405            ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
406            fireConsumerAdvisory(context, dest, topic, info);
407        }
408
409    }
410
411    @Override
412    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
413        super.removeProducer(context, info);
414
415        // Don't advise advisory topics.
416        ActiveMQDestination dest = info.getDestination();
417        if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) {
418            ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest);
419            producers.remove(info.getProducerId());
420            if (!dest.isTemporary() || destinations.containsKey(dest)) {
421                fireProducerAdvisory(context, dest, topic, info.createRemoveCommand());
422            }
423        }
424    }
425
426    @Override
427    public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
428        super.messageExpired(context, messageReference, subscription);
429        try {
430            if (!messageReference.isAdvisory()) {
431                BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
432                ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(baseDestination.getActiveMQDestination());
433                Message payload = messageReference.getMessage().copy();
434                payload.clearBody();
435                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
436                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
437                fireAdvisory(context, topic, payload, null, advisoryMessage);
438            }
439        } catch (Exception e) {
440            handleFireFailure("expired", e);
441        }
442    }
443
444    @Override
445    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
446        super.messageConsumed(context, messageReference);
447        try {
448            if (!messageReference.isAdvisory()) {
449                BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
450                ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(baseDestination.getActiveMQDestination());
451                Message payload = messageReference.getMessage().copy();
452                payload.clearBody();
453                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
454                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
455                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName());
456                fireAdvisory(context, topic, payload, null, advisoryMessage);
457            }
458        } catch (Exception e) {
459            handleFireFailure("consumed", e);
460        }
461    }
462
463    @Override
464    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
465        super.messageDelivered(context, messageReference);
466        try {
467            if (!messageReference.isAdvisory()) {
468                BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
469                ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(baseDestination.getActiveMQDestination());
470                Message payload = messageReference.getMessage().copy();
471                payload.clearBody();
472                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
473                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
474                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName());
475                fireAdvisory(context, topic, payload, null, advisoryMessage);
476            }
477        } catch (Exception e) {
478            handleFireFailure("delivered", e);
479        }
480    }
481
482    @Override
483    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
484        super.messageDiscarded(context, sub, messageReference);
485        try {
486            if (!messageReference.isAdvisory()) {
487                BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
488                ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(baseDestination.getActiveMQDestination());
489                Message payload = messageReference.getMessage().copy();
490                payload.clearBody();
491                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
492                if (sub instanceof TopicSubscription) {
493                    advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded());
494                }
495                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
496                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString());
497                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName());
498
499                fireAdvisory(context, topic, payload, null, advisoryMessage);
500            }
501        } catch (Exception e) {
502            handleFireFailure("discarded", e);
503        }
504    }
505
506    @Override
507    public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
508        super.slowConsumer(context, destination, subs);
509        try {
510            if (!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
511                ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
512                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
513                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString());
514                fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage);
515            }
516        } catch (Exception e) {
517            handleFireFailure("slow consumer", e);
518        }
519    }
520
521    @Override
522    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo, ActiveMQDestination destination) {
523        super.fastProducer(context, producerInfo, destination);
524        try {
525            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
526                ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(destination);
527                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
528                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString());
529                fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
530            }
531        } catch (Exception e) {
532            handleFireFailure("fast producer", e);
533        }
534    }
535
536    private final IdGenerator connectionIdGenerator = new IdGenerator("advisory");
537    private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
538    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
539
540    @Override
541    public void virtualDestinationAdded(ConnectionContext context,
542            VirtualDestination virtualDestination) {
543        super.virtualDestinationAdded(context, virtualDestination);
544
545        if (virtualDestinations.add(virtualDestination)) {
546            try {
547                // Don't advise advisory topics.
548                if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) {
549
550                    //create demand for consumers on virtual destinations
551                    consumersLock.readLock().lock();
552                    try {
553                        //loop through existing destinations to see if any match this newly
554                        //created virtual destination
555                        if (getBrokerService().isUseVirtualDestSubsOnCreation()) {
556                            //for matches that are a queue, fire an advisory for demand
557                            for (ActiveMQDestination destination : destinations.keySet()) {
558                                if(destination.isQueue()) {
559                                    if (virtualDestinationMatcher.matches(virtualDestination, destination)) {
560                                        fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination);
561                                    }
562                                }
563                            }
564                        }
565
566                        //loop through existing consumers to see if any of them are consuming on a destination
567                        //that matches the new virtual destination
568                        for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) {
569                            ConsumerInfo info = iter.next();
570                            if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) {
571                                fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination);
572                            }
573                        }
574                    } finally {
575                        consumersLock.readLock().unlock();
576                    }
577                }
578            } catch (Exception e) {
579                handleFireFailure("virtualDestinationAdded", e);
580            }
581        }
582    }
583
584    private void fireVirtualDestinationAddAdvisory(ConnectionContext context, ConsumerInfo info, ActiveMQDestination activeMQDest,
585            VirtualDestination virtualDestination) throws Exception {
586        //if no consumer info, we need to create one - this is the case when an advisory is fired
587        //because of the existence of a destination matching a virtual destination
588        if (info == null) {
589            //store the virtual destination and the activeMQDestination as a pair so that we can keep track
590            //of all matching forwarded destinations that caused demand
591            VirtualConsumerPair pair = new VirtualConsumerPair(virtualDestination, activeMQDest);
592            if (brokerConsumerDests.get(pair) == null) {
593                ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
594                SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId());
595                ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
596                info = new ConsumerInfo(consumerId);
597
598                if(brokerConsumerDests.putIfAbsent(pair, info) == null) {
599                    LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info);
600                    setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest);
601                    ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
602
603                    if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
604                        LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination);
605                        fireConsumerAdvisory(context, info.getDestination(), topic, info);
606                    }
607                }
608            }
609        //this is the case of a real consumer coming online
610        } else {
611            info = info.copy();
612            setConsumerInfoVirtualDest(info, virtualDestination, activeMQDest);
613            ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
614
615            if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
616                fireConsumerAdvisory(context, info.getDestination(), topic, info);
617            }
618        }
619    }
620
621    /**
622     * Sets the virtual destination on the ConsumerInfo
623     * If this is a VirtualTopic then the destination used will be the actual topic subscribed
624     * to in order to track demand properly
625     *
626     * @param info
627     * @param virtualDestination
628     * @param activeMQDest
629     */
630    private void setConsumerInfoVirtualDest(ConsumerInfo info, VirtualDestination virtualDestination, ActiveMQDestination activeMQDest) {
631        info.setDestination(virtualDestination.getVirtualDestination());
632        if (virtualDestination instanceof VirtualTopic) {
633            VirtualTopic vt = (VirtualTopic) virtualDestination;
634            String prefix = vt.getPrefix() != null ? vt.getPrefix() : "";
635            String postfix = vt.getPostfix() != null ? vt.getPostfix() : "";
636            if (prefix.endsWith(".")) {
637                prefix = prefix.substring(0, prefix.length() - 1);
638            }
639            if (postfix.startsWith(".")) {
640                postfix = postfix.substring(1, postfix.length());
641            }
642            ActiveMQDestination prefixDestination = prefix.length() > 0 ? new ActiveMQTopic(prefix) : null;
643            ActiveMQDestination postfixDestination = postfix.length() > 0 ? new ActiveMQTopic(postfix) : null;
644
645            String[] prefixPaths = prefixDestination != null ? prefixDestination.getDestinationPaths() : new String[] {};
646            String[] activeMQDestPaths = activeMQDest.getDestinationPaths();
647            String[] postfixPaths = postfixDestination != null ? postfixDestination.getDestinationPaths() : new String[] {};
648
649            //sanity check
650            if (activeMQDestPaths.length > prefixPaths.length + postfixPaths.length) {
651                String[] topicPath = Arrays.copyOfRange(activeMQDestPaths, 0 + prefixPaths.length,
652                        activeMQDestPaths.length - postfixPaths.length);
653
654                ActiveMQTopic newTopic = new ActiveMQTopic(DestinationPath.toString(topicPath));
655                info.setDestination(newTopic);
656            }
657        }
658    }
659
660    @Override
661    public void virtualDestinationRemoved(ConnectionContext context,
662            VirtualDestination virtualDestination) {
663        super.virtualDestinationRemoved(context, virtualDestination);
664
665        if (virtualDestinations.remove(virtualDestination)) {
666            try {
667                consumersLock.readLock().lock();
668                try {
669                    // remove the demand created by the addition of the virtual destination
670                    if (getBrokerService().isUseVirtualDestSubsOnCreation()) {
671                        if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) {
672                            for (ConsumerInfo info : virtualDestinationConsumers.keySet()) {
673                                //find all consumers for this virtual destination
674                                if (virtualDestinationConsumers.get(info).equals(virtualDestination)) {
675                                    fireVirtualDestinationRemoveAdvisory(context, info);
676                                }
677
678                                //check consumers created for the existence of a destination to see if they
679                                //match the consumerinfo and clean up
680                                for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) {
681                                    ConsumerInfo i = brokerConsumerDests.get(activeMQDest);
682                                    if (info.equals(i)) {
683                                        brokerConsumerDests.remove(activeMQDest);
684                                    }
685                                }
686                            }
687                        }
688                    }
689                } finally {
690                    consumersLock.readLock().unlock();
691                }
692            } catch (Exception e) {
693                handleFireFailure("virtualDestinationAdded", e);
694            }
695        }
696    }
697
698    private void fireVirtualDestinationRemoveAdvisory(ConnectionContext context,
699            ConsumerInfo info) throws Exception {
700
701        VirtualDestination virtualDestination = virtualDestinationConsumers.remove(info);
702        if (virtualDestination != null) {
703            ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(virtualDestination.getVirtualDestination());
704
705            ActiveMQDestination dest = info.getDestination();
706
707            if (!dest.isTemporary() || destinations.containsKey(dest)) {
708                fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand());
709            }
710        }
711    }
712
713    @Override
714    public void isFull(ConnectionContext context, Destination destination, Usage usage) {
715        super.isFull(context, destination, usage);
716        if (AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()) == false) {
717            try {
718
719                ActiveMQTopic topic = AdvisorySupport.getFullAdvisoryTopic(destination.getActiveMQDestination());
720                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
721                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_USAGE_NAME, usage.getName());
722                advisoryMessage.setLongProperty(AdvisorySupport.MSG_PROPERTY_USAGE_COUNT, usage.getUsage());
723                fireAdvisory(context, topic, null, null, advisoryMessage);
724
725            } catch (Exception e) {
726                handleFireFailure("is full", e);
727            }
728        }
729    }
730
731    @Override
732    public void nowMasterBroker() {
733        super.nowMasterBroker();
734        try {
735            ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic();
736            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
737            ConnectionContext context = new ConnectionContext();
738            context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
739            context.setBroker(getBrokerService().getBroker());
740            fireAdvisory(context, topic, null, null, advisoryMessage);
741        } catch (Exception e) {
742            handleFireFailure("now master broker", e);
743        }
744    }
745
746    @Override
747    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
748                                         Subscription subscription, Throwable poisonCause) {
749        boolean wasDLQd = super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
750        if (wasDLQd) {
751            try {
752                if (!messageReference.isAdvisory()) {
753                    BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
754                    ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(baseDestination.getActiveMQDestination());
755                    Message payload = messageReference.getMessage().copy();
756                    payload.clearBody();
757                    fireAdvisory(context, topic, payload);
758                }
759            } catch (Exception e) {
760                handleFireFailure("add to DLQ", e);
761            }
762        }
763
764        return wasDLQd;
765    }
766
767    @Override
768    public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
769        try {
770            if (brokerInfo != null) {
771                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
772                advisoryMessage.setBooleanProperty("started", true);
773                advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex);
774                advisoryMessage.setStringProperty("remoteIp", remoteIp);
775                networkBridges.putIfAbsent(brokerInfo, advisoryMessage);
776
777                ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
778
779                ConnectionContext context = new ConnectionContext();
780                context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
781                context.setBroker(getBrokerService().getBroker());
782                fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
783            }
784        } catch (Exception e) {
785            handleFireFailure("network bridge started", e);
786        }
787    }
788
789    @Override
790    public void networkBridgeStopped(BrokerInfo brokerInfo) {
791        try {
792            if (brokerInfo != null) {
793                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
794                advisoryMessage.setBooleanProperty("started", false);
795                networkBridges.remove(brokerInfo);
796
797                ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
798
799                ConnectionContext context = new ConnectionContext();
800                context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
801                context.setBroker(getBrokerService().getBroker());
802                fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
803            }
804        } catch (Exception e) {
805            handleFireFailure("network bridge stopped", e);
806        }
807    }
808
809    private void handleFireFailure(String message, Throwable cause) {
810        LOG.warn("Failed to fire {} advisory, reason: {}", message, cause);
811        LOG.debug("{} detail: {}", message, cause, cause);
812    }
813
814    protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
815        fireAdvisory(context, topic, command, null);
816    }
817
818    protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
819        ActiveMQMessage advisoryMessage = new ActiveMQMessage();
820        fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
821    }
822
823    protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command) throws Exception {
824        fireConsumerAdvisory(context, consumerDestination, topic, command, null);
825    }
826
827    protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
828        ActiveMQMessage advisoryMessage = new ActiveMQMessage();
829        int count = 0;
830        Set<Destination> set = getDestinations(consumerDestination);
831        if (set != null) {
832            for (Destination dest : set) {
833                count += dest.getDestinationStatistics().getConsumers().getCount();
834            }
835        }
836        advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count);
837
838        fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
839    }
840
841    protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception {
842        fireProducerAdvisory(context, producerDestination, topic, command, null);
843    }
844
845    protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
846        ActiveMQMessage advisoryMessage = new ActiveMQMessage();
847        int count = 0;
848        if (producerDestination != null) {
849            Set<Destination> set = getDestinations(producerDestination);
850            if (set != null) {
851                for (Destination dest : set) {
852                    count += dest.getDestinationStatistics().getProducers().getCount();
853                }
854            }
855        }
856        advisoryMessage.setIntProperty("producerCount", count);
857        fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
858    }
859
860    public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
861        //set properties
862        advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
863        String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
864        advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
865
866        String url = getBrokerService().getVmConnectorURI().toString();
867        if (getBrokerService().getDefaultSocketURIString() != null) {
868            url = getBrokerService().getDefaultSocketURIString();
869        }
870        advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
871
872        //set the data structure
873        advisoryMessage.setDataStructure(command);
874        advisoryMessage.setPersistent(false);
875        advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
876        advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
877        advisoryMessage.setTargetConsumerId(targetConsumerId);
878        advisoryMessage.setDestination(topic);
879        advisoryMessage.setResponseRequired(false);
880        advisoryMessage.setProducerId(advisoryProducerId);
881        boolean originalFlowControl = context.isProducerFlowControl();
882        final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
883        producerExchange.setConnectionContext(context);
884        producerExchange.setMutable(true);
885        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
886        try {
887            context.setProducerFlowControl(false);
888            next.send(producerExchange, advisoryMessage);
889        } finally {
890            context.setProducerFlowControl(originalFlowControl);
891        }
892    }
893
894    public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() {
895        return connections;
896    }
897
898    public Collection<ConsumerInfo> getAdvisoryConsumers() {
899        consumersLock.readLock().lock();
900        try {
901            return new ArrayList<ConsumerInfo>(consumers.values());
902        } finally {
903            consumersLock.readLock().unlock();
904        }
905    }
906
907    public Map<ProducerId, ProducerInfo> getAdvisoryProducers() {
908        return producers;
909    }
910
911    public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() {
912        return destinations;
913    }
914
915    private class VirtualConsumerPair {
916        private final VirtualDestination virtualDestination;
917
918        //destination that matches this virtualDestination as part target
919        //this is so we can keep track of more than one destination that might
920        //match the virtualDestination and cause demand
921        private final ActiveMQDestination activeMQDestination;
922
923        public VirtualConsumerPair(VirtualDestination virtualDestination,
924                ActiveMQDestination activeMQDestination) {
925            super();
926            this.virtualDestination = virtualDestination;
927            this.activeMQDestination = activeMQDestination;
928        }
929        @Override
930        public int hashCode() {
931            final int prime = 31;
932            int result = 1;
933            result = prime * result + getOuterType().hashCode();
934            result = prime
935                    * result
936                    + ((activeMQDestination == null) ? 0 : activeMQDestination
937                            .hashCode());
938            result = prime
939                    * result
940                    + ((virtualDestination == null) ? 0 : virtualDestination
941                            .hashCode());
942            return result;
943        }
944        @Override
945        public boolean equals(Object obj) {
946            if (this == obj)
947                return true;
948            if (obj == null)
949                return false;
950            if (getClass() != obj.getClass())
951                return false;
952            VirtualConsumerPair other = (VirtualConsumerPair) obj;
953            if (!getOuterType().equals(other.getOuterType()))
954                return false;
955            if (activeMQDestination == null) {
956                if (other.activeMQDestination != null)
957                    return false;
958            } else if (!activeMQDestination.equals(other.activeMQDestination))
959                return false;
960            if (virtualDestination == null) {
961                if (other.virtualDestination != null)
962                    return false;
963            } else if (!virtualDestination.equals(other.virtualDestination))
964                return false;
965            return true;
966        }
967        private AdvisoryBroker getOuterType() {
968            return AdvisoryBroker.this;
969        }
970    }
971}