001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt.strategy; 018 019import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertActiveMQToMQTT; 020import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertMQTTToActiveMQ; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Set; 028import java.util.StringTokenizer; 029 030import org.apache.activemq.ActiveMQPrefetchPolicy; 031import org.apache.activemq.broker.region.QueueRegion; 032import org.apache.activemq.broker.region.RegionBroker; 033import org.apache.activemq.command.ActiveMQDestination; 034import org.apache.activemq.command.ActiveMQQueue; 035import org.apache.activemq.command.ActiveMQTopic; 036import org.apache.activemq.command.ConsumerInfo; 037import org.apache.activemq.command.DestinationInfo; 038import org.apache.activemq.command.RemoveSubscriptionInfo; 039import org.apache.activemq.command.Response; 040import org.apache.activemq.command.SubscriptionInfo; 041import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; 042import org.apache.activemq.transport.mqtt.MQTTProtocolException; 043import org.apache.activemq.transport.mqtt.MQTTProtocolSupport; 044import org.apache.activemq.transport.mqtt.MQTTSubscription; 045import org.apache.activemq.transport.mqtt.ResponseHandler; 046import org.fusesource.mqtt.client.QoS; 047import org.fusesource.mqtt.codec.CONNECT; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * Subscription strategy that converts all MQTT subscribes that would be durable to 053 * Virtual Topic Queue subscriptions. Also maps all publish requests to be prefixed 054 * with the VirtualTopic. prefix unless already present. 055 */ 056public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy { 057 058 private static final String VIRTUALTOPIC_PREFIX = "VirtualTopic."; 059 private static final String VIRTUALTOPIC_CONSUMER_PREFIX = "Consumer."; 060 061 private static final Logger LOG = LoggerFactory.getLogger(MQTTVirtualTopicSubscriptionStrategy.class); 062 063 private final Set<ActiveMQQueue> restoredQueues = Collections.synchronizedSet(new HashSet<ActiveMQQueue>()); 064 065 @Override 066 public void onConnect(CONNECT connect) throws MQTTProtocolException { 067 List<ActiveMQQueue> queues = lookupQueues(protocol.getClientId()); 068 List<SubscriptionInfo> subs = lookupSubscription(protocol.getClientId()); 069 070 // When clean session is true we must purge all of the client's old Queue subscriptions 071 // and any durable subscriptions created on the VirtualTopic instance as well. 072 073 if (connect.cleanSession()) { 074 deleteDurableQueues(queues); 075 deleteDurableSubs(subs); 076 } else { 077 restoreDurableQueue(queues); 078 restoreDurableSubs(subs); 079 } 080 } 081 082 @Override 083 public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException { 084 ActiveMQDestination destination = null; 085 int prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; 086 ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); 087 String converted = convertMQTTToActiveMQ(topicName); 088 if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) { 089 090 if (converted.startsWith(VIRTUALTOPIC_PREFIX)) { 091 destination = new ActiveMQTopic(converted); 092 prefetch = ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH; 093 consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName); 094 } else { 095 converted = VIRTUALTOPIC_CONSUMER_PREFIX + 096 convertMQTTToActiveMQ(protocol.getClientId()) + ":" + requestedQoS + "." + 097 VIRTUALTOPIC_PREFIX + converted; 098 destination = new ActiveMQQueue(converted); 099 prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; 100 } 101 } else { 102 if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) { 103 converted = VIRTUALTOPIC_PREFIX + converted; 104 } 105 destination = new ActiveMQTopic(converted); 106 prefetch = ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH; 107 } 108 109 consumerInfo.setDestination(destination); 110 if (protocol.getActiveMQSubscriptionPrefetch() > 0) { 111 consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); 112 } else { 113 consumerInfo.setPrefetchSize(prefetch); 114 } 115 consumerInfo.setRetroactive(true); 116 consumerInfo.setDispatchAsync(true); 117 118 return doSubscribe(consumerInfo, topicName, requestedQoS); 119 } 120 121 @Override 122 public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException { 123 124 ActiveMQDestination destination = mqttSubscription.getDestination(); 125 126 // check whether the Queue has been recovered in restoreDurableQueue 127 // mark subscription available for recovery for duplicate subscription 128 if (destination.isQueue() && restoredQueues.remove(destination)) { 129 return; 130 } 131 132 // check whether the Topic has been recovered in restoreDurableSubs 133 // mark subscription available for recovery for duplicate subscription 134 if (destination.isTopic() && restoredDurableSubs.remove(destination.getPhysicalName())) { 135 return; 136 } 137 138 if (mqttSubscription.getDestination().isTopic()) { 139 super.onReSubscribe(mqttSubscription); 140 } else { 141 doUnSubscribe(mqttSubscription); 142 ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo(); 143 consumerInfo.setConsumerId(getNextConsumerId()); 144 doSubscribe(consumerInfo, mqttSubscription.getTopicName(), mqttSubscription.getQoS()); 145 } 146 } 147 148 @Override 149 public void onUnSubscribe(String topicName) throws MQTTProtocolException { 150 MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName); 151 if (subscription != null) { 152 doUnSubscribe(subscription); 153 if (subscription.getDestination().isQueue()) { 154 DestinationInfo remove = new DestinationInfo(); 155 remove.setConnectionId(protocol.getConnectionId()); 156 remove.setDestination(subscription.getDestination()); 157 remove.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 158 159 protocol.sendToActiveMQ(remove, new ResponseHandler() { 160 @Override 161 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 162 // ignore failures.. 163 } 164 }); 165 } else if (subscription.getConsumerInfo().getSubscriptionName() != null) { 166 // also remove it from restored durable subscriptions set 167 restoredDurableSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName())); 168 169 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 170 rsi.setConnectionId(protocol.getConnectionId()); 171 rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName()); 172 rsi.setClientId(protocol.getClientId()); 173 protocol.sendToActiveMQ(rsi, new ResponseHandler() { 174 @Override 175 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 176 // ignore failures.. 177 } 178 }); 179 } 180 } 181 } 182 183 @Override 184 public ActiveMQDestination onSend(String topicName) { 185 if (!topicName.startsWith(VIRTUALTOPIC_PREFIX)) { 186 return new ActiveMQTopic(VIRTUALTOPIC_PREFIX + topicName); 187 } else { 188 return new ActiveMQTopic(topicName); 189 } 190 } 191 192 @Override 193 public String onSend(ActiveMQDestination destination) { 194 String destinationName = destination.getPhysicalName(); 195 int position = destinationName.indexOf(VIRTUALTOPIC_PREFIX); 196 if (position >= 0) { 197 destinationName = destinationName.substring(position + VIRTUALTOPIC_PREFIX.length()).substring(0); 198 } 199 return destinationName; 200 } 201 202 @Override 203 public boolean isControlTopic(ActiveMQDestination destination) { 204 String destinationName = destination.getPhysicalName(); 205 if (destinationName.startsWith("$") || destinationName.startsWith(VIRTUALTOPIC_PREFIX + "$")) { 206 return true; 207 } 208 return false; 209 } 210 211 private void deleteDurableQueues(List<ActiveMQQueue> queues) { 212 try { 213 for (ActiveMQQueue queue : queues) { 214 LOG.debug("Removing queue subscription for {} ",queue.getPhysicalName()); 215 DestinationInfo removeAction = new DestinationInfo(); 216 removeAction.setConnectionId(protocol.getConnectionId()); 217 removeAction.setDestination(queue); 218 removeAction.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 219 220 protocol.sendToActiveMQ(removeAction, new ResponseHandler() { 221 @Override 222 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 223 // ignore failures.. 224 } 225 }); 226 } 227 } catch (Throwable e) { 228 LOG.warn("Could not delete the MQTT queue subscriptions.", e); 229 } 230 } 231 232 private void restoreDurableQueue(List<ActiveMQQueue> queues) { 233 try { 234 for (ActiveMQQueue queue : queues) { 235 String name = queue.getPhysicalName().substring(VIRTUALTOPIC_CONSUMER_PREFIX.length()); 236 StringTokenizer tokenizer = new StringTokenizer(name); 237 tokenizer.nextToken(":."); 238 String qosString = tokenizer.nextToken(); 239 tokenizer.nextToken(); 240 String topicName = convertActiveMQToMQTT(tokenizer.nextToken("").substring(1)); 241 QoS qoS = QoS.valueOf(qosString); 242 LOG.trace("Restoring queue subscription: {}:{}", topicName, qoS); 243 244 ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); 245 consumerInfo.setDestination(queue); 246 consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH); 247 if (protocol.getActiveMQSubscriptionPrefetch() > 0) { 248 consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); 249 } 250 consumerInfo.setRetroactive(true); 251 consumerInfo.setDispatchAsync(true); 252 253 doSubscribe(consumerInfo, topicName, qoS); 254 255 // mark this durable subscription as restored by Broker 256 restoredQueues.add(queue); 257 } 258 } catch (IOException e) { 259 LOG.warn("Could not restore the MQTT queue subscriptions.", e); 260 } 261 } 262 263 List<ActiveMQQueue> lookupQueues(String clientId) throws MQTTProtocolException { 264 List<ActiveMQQueue> result = new ArrayList<ActiveMQQueue>(); 265 RegionBroker regionBroker; 266 267 try { 268 regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class); 269 } catch (Exception e) { 270 throw new MQTTProtocolException("Error recovering queues: " + e.getMessage(), false, e); 271 } 272 273 final QueueRegion queueRegion = (QueueRegion) regionBroker.getQueueRegion(); 274 for (ActiveMQDestination destination : queueRegion.getDestinationMap().keySet()) { 275 if (destination.isQueue() && !destination.isTemporary()) { 276 if (destination.getPhysicalName().startsWith("Consumer." + clientId + ":")) { 277 LOG.debug("Recovered client sub: {} on connect", destination.getPhysicalName()); 278 result.add((ActiveMQQueue) destination); 279 } 280 } 281 } 282 283 return result; 284 } 285}