001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt;
018
019import java.io.IOException;
020import java.util.Map;
021import java.util.concurrent.ConcurrentHashMap;
022import java.util.concurrent.ConcurrentMap;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.zip.DataFormatException;
025import java.util.zip.Inflater;
026
027import javax.jms.Destination;
028import javax.jms.InvalidClientIDException;
029import javax.jms.JMSException;
030import javax.jms.Message;
031import javax.security.auth.login.CredentialException;
032
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.BrokerServiceAware;
035import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
036import org.apache.activemq.command.ActiveMQBytesMessage;
037import org.apache.activemq.command.ActiveMQDestination;
038import org.apache.activemq.command.ActiveMQMapMessage;
039import org.apache.activemq.command.ActiveMQMessage;
040import org.apache.activemq.command.ActiveMQTextMessage;
041import org.apache.activemq.command.Command;
042import org.apache.activemq.command.ConnectionError;
043import org.apache.activemq.command.ConnectionId;
044import org.apache.activemq.command.ConnectionInfo;
045import org.apache.activemq.command.ExceptionResponse;
046import org.apache.activemq.command.MessageAck;
047import org.apache.activemq.command.MessageDispatch;
048import org.apache.activemq.command.MessageId;
049import org.apache.activemq.command.ProducerId;
050import org.apache.activemq.command.ProducerInfo;
051import org.apache.activemq.command.Response;
052import org.apache.activemq.command.SessionId;
053import org.apache.activemq.command.SessionInfo;
054import org.apache.activemq.command.ShutdownInfo;
055import org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy;
056import org.apache.activemq.util.ByteArrayOutputStream;
057import org.apache.activemq.util.ByteSequence;
058import org.apache.activemq.util.FactoryFinder;
059import org.apache.activemq.util.IOExceptionSupport;
060import org.apache.activemq.util.IdGenerator;
061import org.apache.activemq.util.JMSExceptionSupport;
062import org.apache.activemq.util.LRUCache;
063import org.apache.activemq.util.LongSequenceGenerator;
064import org.fusesource.hawtbuf.Buffer;
065import org.fusesource.hawtbuf.UTF8Buffer;
066import org.fusesource.mqtt.client.QoS;
067import org.fusesource.mqtt.client.Topic;
068import org.fusesource.mqtt.codec.CONNACK;
069import org.fusesource.mqtt.codec.CONNECT;
070import org.fusesource.mqtt.codec.DISCONNECT;
071import org.fusesource.mqtt.codec.MQTTFrame;
072import org.fusesource.mqtt.codec.PINGREQ;
073import org.fusesource.mqtt.codec.PINGRESP;
074import org.fusesource.mqtt.codec.PUBACK;
075import org.fusesource.mqtt.codec.PUBCOMP;
076import org.fusesource.mqtt.codec.PUBLISH;
077import org.fusesource.mqtt.codec.PUBREC;
078import org.fusesource.mqtt.codec.PUBREL;
079import org.fusesource.mqtt.codec.SUBACK;
080import org.fusesource.mqtt.codec.SUBSCRIBE;
081import org.fusesource.mqtt.codec.UNSUBACK;
082import org.fusesource.mqtt.codec.UNSUBSCRIBE;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086public class MQTTProtocolConverter {
087
088    private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class);
089
090    public static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
091    public static final int V3_1 = 3;
092    public static final int V3_1_1 = 4;
093
094    public static final String SINGLE_LEVEL_WILDCARD = "+";
095    public static final String MULTI_LEVEL_WILDCARD = "#";
096
097    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
098    private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
099    private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5;
100    static final int DEFAULT_CACHE_SIZE = 5000;
101
102    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
103    private final SessionId sessionId = new SessionId(connectionId, -1);
104    private final ProducerId producerId = new ProducerId(sessionId, 1);
105    private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator();
106
107    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
108    private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE);
109    private final Map<ActiveMQDestination, String> mqttTopicMap = new LRUCache<ActiveMQDestination, String>(DEFAULT_CACHE_SIZE);
110
111    private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
112    private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
113
114    private final MQTTTransport mqttTransport;
115    private final BrokerService brokerService;
116
117    private final Object commnadIdMutex = new Object();
118    private int lastCommandId;
119    private final AtomicBoolean connected = new AtomicBoolean(false);
120    private final ConnectionInfo connectionInfo = new ConnectionInfo();
121    private CONNECT connect;
122    private String clientId;
123    private long defaultKeepAlive;
124    private int activeMQSubscriptionPrefetch = -1;
125    private final MQTTPacketIdGenerator packetIdGenerator;
126    private boolean publishDollarTopics;
127
128    public int version;
129
130    private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/");
131
132    /*
133     * Subscription strategy configuration element.
134     *   > mqtt-default-subscriptions
135     *   > mqtt-virtual-topic-subscriptions
136     */
137    private String subscriptionStrategyName = "mqtt-default-subscriptions";
138    private MQTTSubscriptionStrategy subsciptionStrategy;
139
140    public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
141        this.mqttTransport = mqttTransport;
142        this.brokerService = brokerService;
143        this.packetIdGenerator = MQTTPacketIdGenerator.getMQTTPacketIdGenerator(brokerService);
144        this.defaultKeepAlive = 0;
145    }
146
147    int generateCommandId() {
148        synchronized (commnadIdMutex) {
149            return lastCommandId++;
150        }
151    }
152
153    public void sendToActiveMQ(Command command, ResponseHandler handler) {
154
155        // Lets intercept message send requests..
156        if (command instanceof ActiveMQMessage) {
157            ActiveMQMessage msg = (ActiveMQMessage) command;
158            try {
159                if (!getPublishDollarTopics() && findSubscriptionStrategy().isControlTopic(msg.getDestination())) {
160                    // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1
161                    // specification requirements for system assigned destinations.
162                    if (handler != null) {
163                        try {
164                            handler.onResponse(this, new Response());
165                        } catch (IOException e) {
166                            LOG.warn("Failed to send command " + command, e);
167                        }
168                    }
169                    return;
170                }
171            } catch (IOException e) {
172                LOG.warn("Failed to send command " + command, e);
173            }
174        }
175
176        command.setCommandId(generateCommandId());
177        if (handler != null) {
178            command.setResponseRequired(true);
179            resposeHandlers.put(command.getCommandId(), handler);
180        }
181        getMQTTTransport().sendToActiveMQ(command);
182    }
183
184    void sendToMQTT(MQTTFrame frame) {
185        try {
186            mqttTransport.sendToMQTT(frame);
187        } catch (IOException e) {
188            LOG.warn("Failed to send frame " + frame, e);
189        }
190    }
191
192    /**
193     * Convert a MQTT command
194     */
195    public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
196        switch (frame.messageType()) {
197            case PINGREQ.TYPE:
198                LOG.debug("Received a ping from client: " + getClientId());
199                sendToMQTT(PING_RESP_FRAME);
200                LOG.debug("Sent Ping Response to " + getClientId());
201                break;
202            case CONNECT.TYPE:
203                CONNECT connect = new CONNECT().decode(frame);
204                onMQTTConnect(connect);
205                LOG.debug("MQTT Client {} connected. (version: {})", getClientId(), connect.version());
206                break;
207            case DISCONNECT.TYPE:
208                LOG.debug("MQTT Client {} disconnecting", getClientId());
209                onMQTTDisconnect();
210                break;
211            case SUBSCRIBE.TYPE:
212                onSubscribe(new SUBSCRIBE().decode(frame));
213                break;
214            case UNSUBSCRIBE.TYPE:
215                onUnSubscribe(new UNSUBSCRIBE().decode(frame));
216                break;
217            case PUBLISH.TYPE:
218                onMQTTPublish(new PUBLISH().decode(frame));
219                break;
220            case PUBACK.TYPE:
221                onMQTTPubAck(new PUBACK().decode(frame));
222                break;
223            case PUBREC.TYPE:
224                onMQTTPubRec(new PUBREC().decode(frame));
225                break;
226            case PUBREL.TYPE:
227                onMQTTPubRel(new PUBREL().decode(frame));
228                break;
229            case PUBCOMP.TYPE:
230                onMQTTPubComp(new PUBCOMP().decode(frame));
231                break;
232            default:
233                handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
234        }
235    }
236
237    void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
238        if (connected.get()) {
239            throw new MQTTProtocolException("Already connected.");
240        }
241        this.connect = connect;
242
243        // The Server MUST respond to the CONNECT Packet with a CONNACK return code 0x01
244        // (unacceptable protocol level) and then disconnect the Client if the Protocol Level
245        // is not supported by the Server [MQTT-3.1.2-2].
246        if (connect.version() < 3 || connect.version() > 4) {
247            CONNACK ack = new CONNACK();
248            ack.code(CONNACK.Code.CONNECTION_REFUSED_UNACCEPTED_PROTOCOL_VERSION);
249            try {
250                getMQTTTransport().sendToMQTT(ack.encode());
251                getMQTTTransport().onException(IOExceptionSupport.create("Unsupported or invalid protocol version", null));
252            } catch (IOException e) {
253                getMQTTTransport().onException(IOExceptionSupport.create(e));
254            }
255            return;
256        }
257
258        String clientId = "";
259        if (connect.clientId() != null) {
260            clientId = connect.clientId().toString();
261        }
262
263        String userName = null;
264        if (connect.userName() != null) {
265            userName = connect.userName().toString();
266        }
267        String passswd = null;
268        if (connect.password() != null) {
269            passswd = connect.password().toString();
270        }
271
272        version = connect.version();
273
274        configureInactivityMonitor(connect.keepAlive());
275
276        connectionInfo.setConnectionId(connectionId);
277        if (clientId != null && !clientId.isEmpty()) {
278            connectionInfo.setClientId(clientId);
279        } else {
280            // Clean Session MUST be set for 0 length Client Id
281            if (!connect.cleanSession()) {
282                CONNACK ack = new CONNACK();
283                ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
284                try {
285                    getMQTTTransport().sendToMQTT(ack.encode());
286                    getMQTTTransport().onException(IOExceptionSupport.create("Invalid Client ID", null));
287                } catch (IOException e) {
288                    getMQTTTransport().onException(IOExceptionSupport.create(e));
289                }
290                return;
291            }
292            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
293        }
294
295        connectionInfo.setResponseRequired(true);
296        connectionInfo.setUserName(userName);
297        connectionInfo.setPassword(passswd);
298        connectionInfo.setTransportContext(mqttTransport.getPeerCertificates());
299
300        sendToActiveMQ(connectionInfo, new ResponseHandler() {
301            @Override
302            public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
303
304                if (response.isException()) {
305                    // If the connection attempt fails we close the socket.
306                    Throwable exception = ((ExceptionResponse) response).getException();
307                    //let the client know
308                    CONNACK ack = new CONNACK();
309                    if (exception instanceof InvalidClientIDException) {
310                        ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
311                    } else if (exception instanceof SecurityException) {
312                        ack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED);
313                    } else if (exception instanceof CredentialException) {
314                        ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
315                    } else {
316                        ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
317                    }
318                    getMQTTTransport().sendToMQTT(ack.encode());
319                    getMQTTTransport().onException(IOExceptionSupport.create(exception));
320                    return;
321                }
322
323                final SessionInfo sessionInfo = new SessionInfo(sessionId);
324                sendToActiveMQ(sessionInfo, null);
325
326                final ProducerInfo producerInfo = new ProducerInfo(producerId);
327                sendToActiveMQ(producerInfo, new ResponseHandler() {
328                    @Override
329                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
330
331                        if (response.isException()) {
332                            // If the connection attempt fails we close the socket.
333                            Throwable exception = ((ExceptionResponse) response).getException();
334                            CONNACK ack = new CONNACK();
335                            ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
336                            getMQTTTransport().sendToMQTT(ack.encode());
337                            getMQTTTransport().onException(IOExceptionSupport.create(exception));
338                            return;
339                        }
340
341                        CONNACK ack = new CONNACK();
342                        ack.code(CONNACK.Code.CONNECTION_ACCEPTED);
343                        connected.set(true);
344                        getMQTTTransport().sendToMQTT(ack.encode());
345
346                        if (connect.cleanSession()) {
347                            packetIdGenerator.stopClientSession(getClientId());
348                        } else {
349                            packetIdGenerator.startClientSession(getClientId());
350                        }
351
352                        findSubscriptionStrategy().onConnect(connect);
353                    }
354                });
355            }
356        });
357    }
358
359    void onMQTTDisconnect() throws MQTTProtocolException {
360        if (connected.get()) {
361            connected.set(false);
362            sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
363            sendToActiveMQ(new ShutdownInfo(), null);
364        }
365        stopTransport();
366    }
367
368    void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
369        checkConnected();
370        LOG.trace("MQTT SUBSCRIBE message:{} client:{} connection:{}",
371                  command.messageId(), clientId, connectionInfo.getConnectionId());
372        Topic[] topics = command.topics();
373        if (topics != null) {
374            byte[] qos = new byte[topics.length];
375            for (int i = 0; i < topics.length; i++) {
376                try {
377                    qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]);
378                } catch (IOException e) {
379                    throw new MQTTProtocolException("Failed to process subscription request", true, e);
380                }
381            }
382            SUBACK ack = new SUBACK();
383            ack.messageId(command.messageId());
384            ack.grantedQos(qos);
385            try {
386                getMQTTTransport().sendToMQTT(ack.encode());
387            } catch (IOException e) {
388                LOG.warn("Couldn't send SUBACK for " + command, e);
389            }
390        } else {
391            LOG.warn("No topics defined for Subscription " + command);
392        }
393    }
394
395    public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException {
396        checkConnected();
397        if (command.qos() != QoS.AT_LEAST_ONCE && (version != V3_1 || publishDollarTopics != true)) {
398            throw new MQTTProtocolException("Failed to process unsubscribe request", true, new Exception("UNSUBSCRIBE frame not properly formatted, QoS"));
399        }
400        UTF8Buffer[] topics = command.topics();
401        if (topics != null) {
402            for (UTF8Buffer topic : topics) {
403                try {
404                    findSubscriptionStrategy().onUnSubscribe(topic.toString());
405                } catch (IOException e) {
406                    throw new MQTTProtocolException("Failed to process unsubscribe request", true, e);
407                }
408            }
409        }
410        UNSUBACK ack = new UNSUBACK();
411        ack.messageId(command.messageId());
412        sendToMQTT(ack.encode());
413    }
414
415    /**
416     * Dispatch an ActiveMQ command
417     */
418    public void onActiveMQCommand(Command command) throws Exception {
419        if (command.isResponse()) {
420            Response response = (Response) command;
421            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
422            if (rh != null) {
423                rh.onResponse(this, response);
424            } else {
425                // Pass down any unexpected errors. Should this close the connection?
426                if (response.isException()) {
427                    Throwable exception = ((ExceptionResponse) response).getException();
428                    handleException(exception, null);
429                }
430            }
431        } else if (command.isMessageDispatch()) {
432            MessageDispatch md = (MessageDispatch) command;
433            MQTTSubscription sub = findSubscriptionStrategy().getSubscription(md.getConsumerId());
434            if (sub != null) {
435                MessageAck ack = sub.createMessageAck(md);
436                PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
437                switch (publish.qos()) {
438                    case AT_LEAST_ONCE:
439                    case EXACTLY_ONCE:
440                        publish.dup(publish.dup() ? true : md.getMessage().isRedelivered());
441                    case AT_MOST_ONCE:
442                }
443                if (ack != null && sub.expectAck(publish)) {
444                    synchronized (consumerAcks) {
445                        consumerAcks.put(publish.messageId(), ack);
446                    }
447                }
448                LOG.trace("MQTT Snd PUBLISH message:{} client:{} connection:{}",
449                          publish.messageId(), clientId, connectionInfo.getConnectionId());
450                getMQTTTransport().sendToMQTT(publish.encode());
451                if (ack != null && !sub.expectAck(publish)) {
452                    getMQTTTransport().sendToActiveMQ(ack);
453                }
454            }
455        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
456            // Pass down any unexpected async errors. Should this close the connection?
457            Throwable exception = ((ConnectionError) command).getException();
458            handleException(exception, null);
459        } else if (command.isBrokerInfo()) {
460            //ignore
461        } else {
462            LOG.debug("Do not know how to process ActiveMQ Command {}", command);
463        }
464    }
465
466    void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
467        checkConnected();
468        LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}",
469                  command.messageId(), clientId, connectionInfo.getConnectionId());
470        //Both version 3.1 and 3.1.1 do not allow the topic name to contain a wildcard in the publish packet
471        if (containsMqttWildcard(command.topicName().toString())) {
472            // [MQTT-3.3.2-2]: The Topic Name in the PUBLISH Packet MUST NOT contain wildcard characters
473            getMQTTTransport().onException(IOExceptionSupport.create("The topic name must not contain wildcard characters.", null));
474            return;
475        }
476        ActiveMQMessage message = convertMessage(command);
477        message.setProducerId(producerId);
478        message.onSend();
479        sendToActiveMQ(message, createResponseHandler(command));
480    }
481
482    void onMQTTPubAck(PUBACK command) {
483        short messageId = command.messageId();
484        LOG.trace("MQTT Rcv PUBACK message:{} client:{} connection:{}",
485                  messageId, clientId, connectionInfo.getConnectionId());
486        packetIdGenerator.ackPacketId(getClientId(), messageId);
487        MessageAck ack;
488        synchronized (consumerAcks) {
489            ack = consumerAcks.remove(messageId);
490        }
491        if (ack != null) {
492            getMQTTTransport().sendToActiveMQ(ack);
493        }
494    }
495
496    void onMQTTPubRec(PUBREC commnand) {
497        //from a subscriber - send a PUBREL in response
498        PUBREL pubrel = new PUBREL();
499        pubrel.messageId(commnand.messageId());
500        sendToMQTT(pubrel.encode());
501    }
502
503    void onMQTTPubRel(PUBREL command) {
504        PUBREC ack;
505        synchronized (publisherRecs) {
506            ack = publisherRecs.remove(command.messageId());
507        }
508        if (ack == null) {
509            LOG.warn("Unknown PUBREL: {} received", command.messageId());
510        }
511        PUBCOMP pubcomp = new PUBCOMP();
512        pubcomp.messageId(command.messageId());
513        sendToMQTT(pubcomp.encode());
514    }
515
516    void onMQTTPubComp(PUBCOMP command) {
517        short messageId = command.messageId();
518        packetIdGenerator.ackPacketId(getClientId(), messageId);
519        MessageAck ack;
520        synchronized (consumerAcks) {
521            ack = consumerAcks.remove(messageId);
522        }
523        if (ack != null) {
524            getMQTTTransport().sendToActiveMQ(ack);
525        }
526    }
527
528    ActiveMQMessage convertMessage(PUBLISH command) throws JMSException {
529        ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
530
531        msg.setProducerId(producerId);
532        MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId());
533        msg.setMessageId(id);
534        LOG.trace("MQTT-->ActiveMQ: MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}",
535                command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId());
536        msg.setTimestamp(System.currentTimeMillis());
537        msg.setPriority((byte) Message.DEFAULT_PRIORITY);
538        msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
539        msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
540        if (command.retain()) {
541            msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
542        }
543
544        ActiveMQDestination destination;
545        synchronized (activeMQDestinationMap) {
546            destination = activeMQDestinationMap.get(command.topicName());
547            if (destination == null) {
548                String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
549                try {
550                    destination = findSubscriptionStrategy().onSend(topicName);
551                } catch (IOException e) {
552                    throw JMSExceptionSupport.create(e);
553                }
554
555                activeMQDestinationMap.put(command.topicName().toString(), destination);
556            }
557        }
558
559        msg.setJMSDestination(destination);
560        msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length);
561        return msg;
562    }
563
564    public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException {
565        PUBLISH result = new PUBLISH();
566        // packet id is set in MQTTSubscription
567        QoS qoS;
568        if (message.propertyExists(QOS_PROPERTY_NAME)) {
569            int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
570            qoS = QoS.values()[ordinal];
571
572        } else {
573            qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
574        }
575        result.qos(qoS);
576        if (message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)) {
577            result.retain(true);
578        }
579
580        String topicName;
581        synchronized (mqttTopicMap) {
582            ActiveMQDestination destination = message.getDestination();
583            if (destination.isPattern() && message.getOriginalDestination() != null) {
584                destination = message.getOriginalDestination();
585            }
586            topicName = mqttTopicMap.get(destination);
587            if (topicName == null) {
588                String amqTopicName = findSubscriptionStrategy().onSend(destination);
589                topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName);
590                mqttTopicMap.put(destination, topicName);
591            }
592        }
593        result.topicName(new UTF8Buffer(topicName));
594
595        if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
596            ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
597            msg.setReadOnlyBody(true);
598            String messageText = msg.getText();
599            if (messageText != null) {
600                result.payload(new Buffer(messageText.getBytes("UTF-8")));
601            }
602        } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
603            ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
604            msg.setReadOnlyBody(true);
605            byte[] data = new byte[(int) msg.getBodyLength()];
606            msg.readBytes(data);
607            result.payload(new Buffer(data));
608        } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
609            ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
610            msg.setReadOnlyBody(true);
611            Map<String, Object> map = msg.getContentMap();
612            if (map != null) {
613                result.payload(new Buffer(map.toString().getBytes("UTF-8")));
614            }
615        } else {
616            ByteSequence byteSequence = message.getContent();
617            if (byteSequence != null && byteSequence.getLength() > 0) {
618                if (message.isCompressed()) {
619                    Inflater inflater = new Inflater();
620                    inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
621                    byte[] data = new byte[4096];
622                    int read;
623                    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
624                    while ((read = inflater.inflate(data)) != 0) {
625                        bytesOut.write(data, 0, read);
626                    }
627                    byteSequence = bytesOut.toByteSequence();
628                    bytesOut.close();
629                }
630                result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length));
631            }
632        }
633        LOG.trace("ActiveMQ-->MQTT:MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}",
634                result.messageId(), clientId, connectionInfo.getConnectionId(), message.getMessageId());
635        return result;
636    }
637
638    public MQTTTransport getMQTTTransport() {
639        return mqttTransport;
640    }
641
642    AtomicBoolean transportErrorHandled = new AtomicBoolean(false);
643    public void onTransportError() {
644        if (transportErrorHandled.compareAndSet(false, true)) {
645            if (connect != null) {
646                if (connected.get()) {
647                    if (connect.willTopic() != null && connect.willMessage() != null) {
648                        try {
649                            PUBLISH publish = new PUBLISH();
650                            publish.topicName(connect.willTopic());
651                            publish.qos(connect.willQos());
652                            publish.messageId(packetIdGenerator.getNextSequenceId(getClientId()));
653                            publish.payload(connect.willMessage());
654                            publish.retain(connect.willRetain());
655                            ActiveMQMessage message = convertMessage(publish);
656                            message.setProducerId(producerId);
657                            message.onSend();
658
659                            sendToActiveMQ(message, null);
660                        } catch (Exception e) {
661                            LOG.warn("Failed to publish Will Message " + connect.willMessage());
662                        }
663                    }
664                    // remove connection info
665                    sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
666                }
667            }
668        }
669    }
670
671    void configureInactivityMonitor(short keepAliveSeconds) {
672        MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
673
674        // If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false,
675        // then ignore configuring it because it won't exist
676        if (monitor == null) {
677            return;
678        }
679
680        // Client has sent a valid CONNECT frame, we can stop the connect checker.
681        monitor.stopConnectChecker();
682
683        long keepAliveMS = keepAliveSeconds * 1000L;
684
685        LOG.debug("MQTT Client {} requests heart beat of {} ms", getClientId(), keepAliveMS);
686
687        try {
688            // if we have a default keep-alive value, and the client is trying to turn off keep-alive,
689
690            // we'll observe the server-side configured default value (note, no grace period)
691            if (keepAliveMS == 0 && defaultKeepAlive > 0) {
692                keepAliveMS = defaultKeepAlive;
693            }
694
695            long readGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD);
696
697            monitor.setProtocolConverter(this);
698            monitor.setReadKeepAliveTime(keepAliveMS);
699            monitor.setReadGraceTime(readGracePeriod);
700            monitor.startReadChecker();
701
702            LOG.debug("MQTT Client {} established heart beat of  {} ms ({} ms + {} ms grace period)",
703                      new Object[] { getClientId(), keepAliveMS, keepAliveMS, readGracePeriod });
704        } catch (Exception ex) {
705            LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
706        }
707    }
708
709    void handleException(Throwable exception, MQTTFrame command) {
710        LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
711        LOG.debug("Exception detail", exception);
712
713        if (connected.get() && connectionInfo != null) {
714            connected.set(false);
715            sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
716        }
717        stopTransport();
718    }
719
720    void checkConnected() throws MQTTProtocolException {
721        if (!connected.get()) {
722            throw new MQTTProtocolException("Not connected.");
723        }
724    }
725
726    private void stopTransport() {
727        try {
728            getMQTTTransport().stop();
729        } catch (Throwable e) {
730            LOG.debug("Failed to stop MQTT transport ", e);
731        }
732    }
733
734    ResponseHandler createResponseHandler(final PUBLISH command) {
735        if (command != null) {
736            return new ResponseHandler() {
737                @Override
738                public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
739                    if (response.isException()) {
740                        Throwable error = ((ExceptionResponse) response).getException();
741                        LOG.warn("Failed to send MQTT Publish: ", command, error.getMessage());
742                        LOG.trace("Error trace: {}", error);
743                    }
744
745                    switch (command.qos()) {
746                        case AT_LEAST_ONCE:
747                            PUBACK ack = new PUBACK();
748                            ack.messageId(command.messageId());
749                            LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}",
750                                      command.messageId(), clientId, connectionInfo.getConnectionId());
751                            converter.getMQTTTransport().sendToMQTT(ack.encode());
752                            break;
753                        case EXACTLY_ONCE:
754                            PUBREC req = new PUBREC();
755                            req.messageId(command.messageId());
756                            synchronized (publisherRecs) {
757                                publisherRecs.put(command.messageId(), req);
758                            }
759                            LOG.trace("MQTT Snd PUBREC message:{} client:{} connection:{}",
760                                      command.messageId(), clientId, connectionInfo.getConnectionId());
761                            converter.getMQTTTransport().sendToMQTT(req.encode());
762                            break;
763                        default:
764                            break;
765                    }
766                }
767            };
768        }
769        return null;
770    }
771
772    public long getDefaultKeepAlive() {
773        return defaultKeepAlive;
774    }
775
776    /**
777     * Set the default keep alive time (in milliseconds) that would be used if configured on server side
778     * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
779     * @param keepAlive the keepAlive in milliseconds
780     */
781    public void setDefaultKeepAlive(long keepAlive) {
782        this.defaultKeepAlive = keepAlive;
783    }
784
785    public int getActiveMQSubscriptionPrefetch() {
786        return activeMQSubscriptionPrefetch;
787    }
788
789    /**
790     * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
791     * The default = 1
792     *
793     * @param activeMQSubscriptionPrefetch
794     *        set the prefetch for the corresponding ActiveMQ subscription
795     */
796    public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
797        this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch;
798    }
799
800    public MQTTPacketIdGenerator getPacketIdGenerator() {
801        return packetIdGenerator;
802    }
803
804    public void setPublishDollarTopics(boolean publishDollarTopics) {
805        this.publishDollarTopics = publishDollarTopics;
806    }
807
808    public boolean getPublishDollarTopics() {
809        return publishDollarTopics;
810    }
811
812    public ConnectionId getConnectionId() {
813        return connectionId;
814    }
815
816    public SessionId getSessionId() {
817        return sessionId;
818    }
819
820    public boolean isCleanSession() {
821        return this.connect.cleanSession();
822    }
823
824    public String getSubscriptionStrategy() {
825        return subscriptionStrategyName;
826    }
827
828    public void setSubscriptionStrategy(String name) {
829        this.subscriptionStrategyName = name;
830    }
831
832    public String getClientId() {
833        if (clientId == null) {
834            if (connect != null && connect.clientId() != null) {
835                clientId = connect.clientId().toString();
836            } else {
837                clientId = "";
838            }
839        }
840        return clientId;
841    }
842
843    protected boolean containsMqttWildcard(String value) {
844        return value != null && (value.contains(SINGLE_LEVEL_WILDCARD) ||
845                value.contains(MULTI_LEVEL_WILDCARD));
846    }
847
848    protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException {
849        if (subsciptionStrategy == null) {
850            synchronized (STRATAGY_FINDER) {
851                if (subsciptionStrategy != null) {
852                    return subsciptionStrategy;
853                }
854
855                MQTTSubscriptionStrategy strategy = null;
856                if (subscriptionStrategyName != null && !subscriptionStrategyName.isEmpty()) {
857                    try {
858                        strategy = (MQTTSubscriptionStrategy) STRATAGY_FINDER.newInstance(subscriptionStrategyName);
859                        LOG.debug("MQTT Using subscription strategy: {}", subscriptionStrategyName);
860                        if (strategy instanceof BrokerServiceAware) {
861                            ((BrokerServiceAware)strategy).setBrokerService(brokerService);
862                        }
863                        strategy.initialize(this);
864                    } catch (Exception e) {
865                        throw IOExceptionSupport.create(e);
866                    }
867                } else {
868                    throw new IOException("Invalid subscription strategy name given: " + subscriptionStrategyName);
869                }
870
871                this.subsciptionStrategy = strategy;
872            }
873        }
874        return subsciptionStrategy;
875    }
876
877    // for testing
878    public void setSubsciptionStrategy(MQTTSubscriptionStrategy subsciptionStrategy) {
879        this.subsciptionStrategy = subsciptionStrategy;
880    }
881}