001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020 021import org.apache.activemq.broker.region.RegionBroker; 022import org.apache.activemq.broker.region.Subscription; 023import org.apache.activemq.broker.region.TopicRegion; 024import org.apache.activemq.command.ActiveMQDestination; 025import org.apache.activemq.command.ConsumerId; 026import org.apache.activemq.command.ConsumerInfo; 027import org.apache.activemq.filter.DestinationFilter; 028import org.apache.activemq.transport.Transport; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * Consolidates subscriptions 034 */ 035public class DurableConduitBridge extends ConduitBridge { 036 private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class); 037 038 @Override 039 public String toString() { 040 return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName(); 041 } 042 /** 043 * Constructor 044 * 045 * @param configuration 046 * 047 * @param localBroker 048 * @param remoteBroker 049 */ 050 public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker, 051 Transport remoteBroker) { 052 super(configuration, localBroker, remoteBroker); 053 } 054 055 /** 056 * Subscriptions for these destinations are always created 057 * 058 */ 059 @Override 060 protected void setupStaticDestinations() { 061 super.setupStaticDestinations(); 062 ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations; 063 if (dests != null) { 064 for (ActiveMQDestination dest : dests) { 065 if (isPermissableDestination(dest) && !doesConsumerExist(dest)) { 066 try { 067 //Filtering by non-empty subscriptions, see AMQ-5875 068 if (dest.isTopic()) { 069 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); 070 TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); 071 072 String candidateSubName = getSubscriberName(dest); 073 for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) { 074 String subName = subscription.getConsumerInfo().getSubscriptionName(); 075 if (subName != null && subName.equals(candidateSubName)) { 076 DemandSubscription sub = createDemandSubscription(dest); 077 sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest)); 078 sub.setStaticallyIncluded(true); 079 addSubscription(sub); 080 break; 081 } 082 } 083 } 084 } catch (IOException e) { 085 LOG.error("Failed to add static destination {}", dest, e); 086 } 087 LOG.trace("Forwarding messages for durable destination: {}", dest); 088 } 089 } 090 } 091 } 092 093 @Override 094 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 095 if (addToAlreadyInterestedConsumers(info)) { 096 return null; // don't want this subscription added 097 } 098 //add our original id to ourselves 099 info.addNetworkConsumerId(info.getConsumerId()); 100 101 if (info.isDurable()) { 102 // set the subscriber name to something reproducible 103 info.setSubscriptionName(getSubscriberName(info.getDestination())); 104 // and override the consumerId with something unique so that it won't 105 // be removed if the durable subscriber (at the other end) goes away 106 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), 107 consumerIdGenerator.getNextSequenceId())); 108 } 109 info.setSelector(null); 110 return doCreateDemandSubscription(info); 111 } 112 113 protected String getSubscriberName(ActiveMQDestination dest) { 114 String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName(); 115 return subscriberName; 116 } 117 118 protected boolean doesConsumerExist(ActiveMQDestination dest) { 119 DestinationFilter filter = DestinationFilter.parseFilter(dest); 120 for (DemandSubscription ds : subscriptionMapByLocalId.values()) { 121 if (filter.matches(ds.getLocalInfo().getDestination())) { 122 return true; 123 } 124 } 125 return false; 126 } 127}