001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt.strategy; 018 019import java.io.IOException; 020import java.util.List; 021 022import org.apache.activemq.ActiveMQPrefetchPolicy; 023import org.apache.activemq.command.ActiveMQDestination; 024import org.apache.activemq.command.ActiveMQTopic; 025import org.apache.activemq.command.ConsumerInfo; 026import org.apache.activemq.command.RemoveSubscriptionInfo; 027import org.apache.activemq.command.Response; 028import org.apache.activemq.command.SubscriptionInfo; 029import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; 030import org.apache.activemq.transport.mqtt.MQTTProtocolException; 031import org.apache.activemq.transport.mqtt.MQTTProtocolSupport; 032import org.apache.activemq.transport.mqtt.MQTTSubscription; 033import org.apache.activemq.transport.mqtt.ResponseHandler; 034import org.fusesource.mqtt.client.QoS; 035import org.fusesource.mqtt.codec.CONNECT; 036 037/** 038 * Default implementation that uses unmapped topic subscriptions. 039 */ 040public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy { 041 042 @Override 043 public void onConnect(CONNECT connect) throws MQTTProtocolException { 044 List<SubscriptionInfo> subs = lookupSubscription(protocol.getClientId()); 045 046 if (connect.cleanSession()) { 047 deleteDurableSubs(subs); 048 } else { 049 restoreDurableSubs(subs); 050 } 051 } 052 053 @Override 054 public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException { 055 ActiveMQDestination destination = new ActiveMQTopic(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName)); 056 057 ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); 058 consumerInfo.setDestination(destination); 059 consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH); 060 consumerInfo.setRetroactive(true); 061 consumerInfo.setDispatchAsync(true); 062 // create durable subscriptions only when clean session is false 063 if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) { 064 consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName); 065 consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH); 066 } 067 068 if (protocol.getActiveMQSubscriptionPrefetch() > 0) { 069 consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); 070 } 071 072 return doSubscribe(consumerInfo, topicName, requestedQoS); 073 } 074 075 @Override 076 public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException { 077 078 ActiveMQDestination destination = mqttSubscription.getDestination(); 079 080 // check whether the Topic has been recovered in restoreDurableSubs 081 // mark subscription available for recovery for duplicate subscription 082 if (restoredDurableSubs.remove(destination.getPhysicalName())) { 083 return; 084 } 085 086 super.onReSubscribe(mqttSubscription); 087 } 088 089 @Override 090 public void onUnSubscribe(String topicName) throws MQTTProtocolException { 091 MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName); 092 if (subscription != null) { 093 doUnSubscribe(subscription); 094 095 // check if the durable sub also needs to be removed 096 if (subscription.getConsumerInfo().getSubscriptionName() != null) { 097 // also remove it from restored durable subscriptions set 098 restoredDurableSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName())); 099 100 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 101 rsi.setConnectionId(protocol.getConnectionId()); 102 rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName()); 103 rsi.setClientId(protocol.getClientId()); 104 protocol.sendToActiveMQ(rsi, new ResponseHandler() { 105 @Override 106 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 107 // ignore failures.. 108 } 109 }); 110 } 111 } 112 } 113}