001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt;
018
019import java.io.IOException;
020import java.util.Map;
021import java.util.concurrent.ConcurrentHashMap;
022import java.util.concurrent.ConcurrentMap;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.zip.DataFormatException;
025import java.util.zip.Inflater;
026
027import javax.jms.Destination;
028import javax.jms.InvalidClientIDException;
029import javax.jms.JMSException;
030import javax.jms.Message;
031import javax.security.auth.login.CredentialException;
032
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.BrokerServiceAware;
035import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
036import org.apache.activemq.command.ActiveMQBytesMessage;
037import org.apache.activemq.command.ActiveMQDestination;
038import org.apache.activemq.command.ActiveMQMapMessage;
039import org.apache.activemq.command.ActiveMQMessage;
040import org.apache.activemq.command.ActiveMQTextMessage;
041import org.apache.activemq.command.Command;
042import org.apache.activemq.command.ConnectionError;
043import org.apache.activemq.command.ConnectionId;
044import org.apache.activemq.command.ConnectionInfo;
045import org.apache.activemq.command.ExceptionResponse;
046import org.apache.activemq.command.MessageAck;
047import org.apache.activemq.command.MessageDispatch;
048import org.apache.activemq.command.MessageId;
049import org.apache.activemq.command.ProducerId;
050import org.apache.activemq.command.ProducerInfo;
051import org.apache.activemq.command.Response;
052import org.apache.activemq.command.SessionId;
053import org.apache.activemq.command.SessionInfo;
054import org.apache.activemq.command.ShutdownInfo;
055import org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy;
056import org.apache.activemq.util.ByteArrayOutputStream;
057import org.apache.activemq.util.ByteSequence;
058import org.apache.activemq.util.FactoryFinder;
059import org.apache.activemq.util.IOExceptionSupport;
060import org.apache.activemq.util.IdGenerator;
061import org.apache.activemq.util.JMSExceptionSupport;
062import org.apache.activemq.util.LRUCache;
063import org.apache.activemq.util.LongSequenceGenerator;
064import org.fusesource.hawtbuf.Buffer;
065import org.fusesource.hawtbuf.UTF8Buffer;
066import org.fusesource.mqtt.client.QoS;
067import org.fusesource.mqtt.client.Topic;
068import org.fusesource.mqtt.codec.CONNACK;
069import org.fusesource.mqtt.codec.CONNECT;
070import org.fusesource.mqtt.codec.DISCONNECT;
071import org.fusesource.mqtt.codec.MQTTFrame;
072import org.fusesource.mqtt.codec.PINGREQ;
073import org.fusesource.mqtt.codec.PINGRESP;
074import org.fusesource.mqtt.codec.PUBACK;
075import org.fusesource.mqtt.codec.PUBCOMP;
076import org.fusesource.mqtt.codec.PUBLISH;
077import org.fusesource.mqtt.codec.PUBREC;
078import org.fusesource.mqtt.codec.PUBREL;
079import org.fusesource.mqtt.codec.SUBACK;
080import org.fusesource.mqtt.codec.SUBSCRIBE;
081import org.fusesource.mqtt.codec.UNSUBACK;
082import org.fusesource.mqtt.codec.UNSUBSCRIBE;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086public class MQTTProtocolConverter {
087
088    private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class);
089
090    public static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
091    public static final int V3_1 = 3;
092    public static final int V3_1_1 = 4;
093
094    public static final String SINGLE_LEVEL_WILDCARD = "+";
095    public static final String MULTI_LEVEL_WILDCARD = "#";
096
097    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
098    private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
099    private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5;
100    static final int DEFAULT_CACHE_SIZE = 5000;
101
102    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
103    private final SessionId sessionId = new SessionId(connectionId, -1);
104    private final ProducerId producerId = new ProducerId(sessionId, 1);
105    private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator();
106
107    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
108    private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE);
109    private final Map<ActiveMQDestination, String> mqttTopicMap = new LRUCache<ActiveMQDestination, String>(DEFAULT_CACHE_SIZE);
110
111    private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
112    private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
113
114    private final MQTTTransport mqttTransport;
115    private final BrokerService brokerService;
116
117    private final Object commnadIdMutex = new Object();
118    private int lastCommandId;
119    private final AtomicBoolean connected = new AtomicBoolean(false);
120    private final ConnectionInfo connectionInfo = new ConnectionInfo();
121    private CONNECT connect;
122    private String clientId;
123    private long defaultKeepAlive;
124    private int activeMQSubscriptionPrefetch = -1;
125    private final MQTTPacketIdGenerator packetIdGenerator;
126    private boolean publishDollarTopics;
127
128    public int version;
129
130    private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/");
131
132    /*
133     * Subscription strategy configuration element.
134     *   > mqtt-default-subscriptions
135     *   > mqtt-virtual-topic-subscriptions
136     */
137    private String subscriptionStrategyName = "mqtt-default-subscriptions";
138    private MQTTSubscriptionStrategy subsciptionStrategy;
139
140    public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
141        this.mqttTransport = mqttTransport;
142        this.brokerService = brokerService;
143        this.packetIdGenerator = MQTTPacketIdGenerator.getMQTTPacketIdGenerator(brokerService);
144        this.defaultKeepAlive = 0;
145    }
146
147    int generateCommandId() {
148        synchronized (commnadIdMutex) {
149            return lastCommandId++;
150        }
151    }
152
153    public void sendToActiveMQ(Command command, ResponseHandler handler) {
154
155        // Lets intercept message send requests..
156        if (command instanceof ActiveMQMessage) {
157            ActiveMQMessage msg = (ActiveMQMessage) command;
158            try {
159                if (!getPublishDollarTopics() && findSubscriptionStrategy().isControlTopic(msg.getDestination())) {
160                    // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1
161                    // specification requirements for system assigned destinations.
162                    if (handler != null) {
163                        try {
164                            handler.onResponse(this, new Response());
165                        } catch (IOException e) {
166                            e.printStackTrace();
167                        }
168                    }
169                    return;
170                }
171            } catch (IOException e) {
172                e.printStackTrace();
173            }
174        }
175
176        command.setCommandId(generateCommandId());
177        if (handler != null) {
178            command.setResponseRequired(true);
179            resposeHandlers.put(command.getCommandId(), handler);
180        }
181        getMQTTTransport().sendToActiveMQ(command);
182    }
183
184    void sendToMQTT(MQTTFrame frame) {
185        try {
186            mqttTransport.sendToMQTT(frame);
187        } catch (IOException e) {
188            LOG.warn("Failed to send frame " + frame, e);
189        }
190    }
191
192    /**
193     * Convert a MQTT command
194     */
195    public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
196        switch (frame.messageType()) {
197            case PINGREQ.TYPE:
198                LOG.debug("Received a ping from client: " + getClientId());
199                sendToMQTT(PING_RESP_FRAME);
200                LOG.debug("Sent Ping Response to " + getClientId());
201                break;
202            case CONNECT.TYPE:
203                CONNECT connect = new CONNECT().decode(frame);
204                onMQTTConnect(connect);
205                LOG.debug("MQTT Client {} connected. (version: {})", getClientId(), connect.version());
206                break;
207            case DISCONNECT.TYPE:
208                LOG.debug("MQTT Client {} disconnecting", getClientId());
209                onMQTTDisconnect();
210                break;
211            case SUBSCRIBE.TYPE:
212                onSubscribe(new SUBSCRIBE().decode(frame));
213                break;
214            case UNSUBSCRIBE.TYPE:
215                onUnSubscribe(new UNSUBSCRIBE().decode(frame));
216                break;
217            case PUBLISH.TYPE:
218                onMQTTPublish(new PUBLISH().decode(frame));
219                break;
220            case PUBACK.TYPE:
221                onMQTTPubAck(new PUBACK().decode(frame));
222                break;
223            case PUBREC.TYPE:
224                onMQTTPubRec(new PUBREC().decode(frame));
225                break;
226            case PUBREL.TYPE:
227                onMQTTPubRel(new PUBREL().decode(frame));
228                break;
229            case PUBCOMP.TYPE:
230                onMQTTPubComp(new PUBCOMP().decode(frame));
231                break;
232            default:
233                handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
234        }
235    }
236
237    void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
238        if (connected.get()) {
239            throw new MQTTProtocolException("Already connected.");
240        }
241        this.connect = connect;
242
243        String clientId = "";
244        if (connect.clientId() != null) {
245            clientId = connect.clientId().toString();
246        }
247
248        String userName = null;
249        if (connect.userName() != null) {
250            userName = connect.userName().toString();
251        }
252        String passswd = null;
253        if (connect.password() != null) {
254            passswd = connect.password().toString();
255        }
256
257        version = connect.version();
258
259        configureInactivityMonitor(connect.keepAlive());
260
261        connectionInfo.setConnectionId(connectionId);
262        if (clientId != null && !clientId.isEmpty()) {
263            connectionInfo.setClientId(clientId);
264        } else {
265            // Clean Session MUST be set for 0 length Client Id
266            if (!connect.cleanSession()) {
267                CONNACK ack = new CONNACK();
268                ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
269                try {
270                    getMQTTTransport().sendToMQTT(ack.encode());
271                    getMQTTTransport().onException(IOExceptionSupport.create("Invalid Client ID", null));
272                } catch (IOException e) {
273                    getMQTTTransport().onException(IOExceptionSupport.create(e));
274                }
275                return;
276            }
277            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
278        }
279
280        connectionInfo.setResponseRequired(true);
281        connectionInfo.setUserName(userName);
282        connectionInfo.setPassword(passswd);
283        connectionInfo.setTransportContext(mqttTransport.getPeerCertificates());
284
285        sendToActiveMQ(connectionInfo, new ResponseHandler() {
286            @Override
287            public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
288
289                if (response.isException()) {
290                    // If the connection attempt fails we close the socket.
291                    Throwable exception = ((ExceptionResponse) response).getException();
292                    //let the client know
293                    CONNACK ack = new CONNACK();
294                    if (exception instanceof InvalidClientIDException) {
295                        ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
296                    } else if (exception instanceof SecurityException) {
297                        ack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED);
298                    } else if (exception instanceof CredentialException) {
299                        ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
300                    } else {
301                        ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
302                    }
303                    getMQTTTransport().sendToMQTT(ack.encode());
304                    getMQTTTransport().onException(IOExceptionSupport.create(exception));
305                    return;
306                }
307
308                final SessionInfo sessionInfo = new SessionInfo(sessionId);
309                sendToActiveMQ(sessionInfo, null);
310
311                final ProducerInfo producerInfo = new ProducerInfo(producerId);
312                sendToActiveMQ(producerInfo, new ResponseHandler() {
313                    @Override
314                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
315
316                        if (response.isException()) {
317                            // If the connection attempt fails we close the socket.
318                            Throwable exception = ((ExceptionResponse) response).getException();
319                            CONNACK ack = new CONNACK();
320                            ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
321                            getMQTTTransport().sendToMQTT(ack.encode());
322                            getMQTTTransport().onException(IOExceptionSupport.create(exception));
323                            return;
324                        }
325
326                        CONNACK ack = new CONNACK();
327                        ack.code(CONNACK.Code.CONNECTION_ACCEPTED);
328                        connected.set(true);
329                        getMQTTTransport().sendToMQTT(ack.encode());
330
331                        if (connect.cleanSession()) {
332                            packetIdGenerator.stopClientSession(getClientId());
333                        } else {
334                            packetIdGenerator.startClientSession(getClientId());
335                        }
336
337                        findSubscriptionStrategy().onConnect(connect);
338                    }
339                });
340            }
341        });
342    }
343
344    void onMQTTDisconnect() throws MQTTProtocolException {
345        if (connected.get()) {
346            connected.set(false);
347            sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
348            sendToActiveMQ(new ShutdownInfo(), null);
349        }
350        stopTransport();
351    }
352
353    void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
354        checkConnected();
355        LOG.trace("MQTT SUBSCRIBE message:{} client:{} connection:{}",
356                  command.messageId(), clientId, connectionInfo.getConnectionId());
357        Topic[] topics = command.topics();
358        if (topics != null) {
359            byte[] qos = new byte[topics.length];
360            for (int i = 0; i < topics.length; i++) {
361                try {
362                    qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]);
363                } catch (IOException e) {
364                    throw new MQTTProtocolException("Failed to process subscription request", true, e);
365                }
366            }
367            SUBACK ack = new SUBACK();
368            ack.messageId(command.messageId());
369            ack.grantedQos(qos);
370            try {
371                getMQTTTransport().sendToMQTT(ack.encode());
372            } catch (IOException e) {
373                LOG.warn("Couldn't send SUBACK for " + command, e);
374            }
375        } else {
376            LOG.warn("No topics defined for Subscription " + command);
377        }
378    }
379
380    public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException {
381        checkConnected();
382        if (command.qos() != QoS.AT_LEAST_ONCE && (version != V3_1 || publishDollarTopics != true)) {
383            throw new MQTTProtocolException("Failed to process unsubscribe request", true, new Exception("UNSUBSCRIBE frame not properly formatted, QoS"));
384        }
385        UTF8Buffer[] topics = command.topics();
386        if (topics != null) {
387            for (UTF8Buffer topic : topics) {
388                try {
389                    findSubscriptionStrategy().onUnSubscribe(topic.toString());
390                } catch (IOException e) {
391                    throw new MQTTProtocolException("Failed to process unsubscribe request", true, e);
392                }
393            }
394        }
395        UNSUBACK ack = new UNSUBACK();
396        ack.messageId(command.messageId());
397        sendToMQTT(ack.encode());
398    }
399
400    /**
401     * Dispatch an ActiveMQ command
402     */
403    public void onActiveMQCommand(Command command) throws Exception {
404        if (command.isResponse()) {
405            Response response = (Response) command;
406            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
407            if (rh != null) {
408                rh.onResponse(this, response);
409            } else {
410                // Pass down any unexpected errors. Should this close the connection?
411                if (response.isException()) {
412                    Throwable exception = ((ExceptionResponse) response).getException();
413                    handleException(exception, null);
414                }
415            }
416        } else if (command.isMessageDispatch()) {
417            MessageDispatch md = (MessageDispatch) command;
418            MQTTSubscription sub = findSubscriptionStrategy().getSubscription(md.getConsumerId());
419            if (sub != null) {
420                MessageAck ack = sub.createMessageAck(md);
421                PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
422                switch (publish.qos()) {
423                    case AT_LEAST_ONCE:
424                    case EXACTLY_ONCE:
425                        publish.dup(publish.dup() ? true : md.getMessage().isRedelivered());
426                    case AT_MOST_ONCE:
427                }
428                if (ack != null && sub.expectAck(publish)) {
429                    synchronized (consumerAcks) {
430                        consumerAcks.put(publish.messageId(), ack);
431                    }
432                }
433                LOG.trace("MQTT Snd PUBLISH message:{} client:{} connection:{}",
434                          publish.messageId(), clientId, connectionInfo.getConnectionId());
435                getMQTTTransport().sendToMQTT(publish.encode());
436                if (ack != null && !sub.expectAck(publish)) {
437                    getMQTTTransport().sendToActiveMQ(ack);
438                }
439            }
440        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
441            // Pass down any unexpected async errors. Should this close the connection?
442            Throwable exception = ((ConnectionError) command).getException();
443            handleException(exception, null);
444        } else if (command.isBrokerInfo()) {
445            //ignore
446        } else {
447            LOG.debug("Do not know how to process ActiveMQ Command {}", command);
448        }
449    }
450
451    void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
452        checkConnected();
453        LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}",
454                  command.messageId(), clientId, connectionInfo.getConnectionId());
455        //Both version 3.1 and 3.1.1 do not allow the topic name to contain a wildcard in the publish packet
456        if (containsMqttWildcard(command.topicName().toString())) {
457            // [MQTT-3.3.2-2]: The Topic Name in the PUBLISH Packet MUST NOT contain wildcard characters
458            getMQTTTransport().onException(IOExceptionSupport.create("The topic name must not contain wildcard characters.", null));
459            return;
460        }
461        ActiveMQMessage message = convertMessage(command);
462        message.setProducerId(producerId);
463        message.onSend();
464        sendToActiveMQ(message, createResponseHandler(command));
465    }
466
467    void onMQTTPubAck(PUBACK command) {
468        short messageId = command.messageId();
469        LOG.trace("MQTT Rcv PUBACK message:{} client:{} connection:{}",
470                  messageId, clientId, connectionInfo.getConnectionId());
471        packetIdGenerator.ackPacketId(getClientId(), messageId);
472        MessageAck ack;
473        synchronized (consumerAcks) {
474            ack = consumerAcks.remove(messageId);
475        }
476        if (ack != null) {
477            getMQTTTransport().sendToActiveMQ(ack);
478        }
479    }
480
481    void onMQTTPubRec(PUBREC commnand) {
482        //from a subscriber - send a PUBREL in response
483        PUBREL pubrel = new PUBREL();
484        pubrel.messageId(commnand.messageId());
485        sendToMQTT(pubrel.encode());
486    }
487
488    void onMQTTPubRel(PUBREL command) {
489        PUBREC ack;
490        synchronized (publisherRecs) {
491            ack = publisherRecs.remove(command.messageId());
492        }
493        if (ack == null) {
494            LOG.warn("Unknown PUBREL: {} received", command.messageId());
495        }
496        PUBCOMP pubcomp = new PUBCOMP();
497        pubcomp.messageId(command.messageId());
498        sendToMQTT(pubcomp.encode());
499    }
500
501    void onMQTTPubComp(PUBCOMP command) {
502        short messageId = command.messageId();
503        packetIdGenerator.ackPacketId(getClientId(), messageId);
504        MessageAck ack;
505        synchronized (consumerAcks) {
506            ack = consumerAcks.remove(messageId);
507        }
508        if (ack != null) {
509            getMQTTTransport().sendToActiveMQ(ack);
510        }
511    }
512
513    ActiveMQMessage convertMessage(PUBLISH command) throws JMSException {
514        ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
515
516        msg.setProducerId(producerId);
517        MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId());
518        msg.setMessageId(id);
519        LOG.trace("MQTT-->ActiveMQ: MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}",
520                command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId());
521        msg.setTimestamp(System.currentTimeMillis());
522        msg.setPriority((byte) Message.DEFAULT_PRIORITY);
523        msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
524        msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
525        if (command.retain()) {
526            msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
527        }
528
529        ActiveMQDestination destination;
530        synchronized (activeMQDestinationMap) {
531            destination = activeMQDestinationMap.get(command.topicName());
532            if (destination == null) {
533                String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
534                try {
535                    destination = findSubscriptionStrategy().onSend(topicName);
536                } catch (IOException e) {
537                    throw JMSExceptionSupport.create(e);
538                }
539
540                activeMQDestinationMap.put(command.topicName().toString(), destination);
541            }
542        }
543
544        msg.setJMSDestination(destination);
545        msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length);
546        return msg;
547    }
548
549    public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException {
550        PUBLISH result = new PUBLISH();
551        // packet id is set in MQTTSubscription
552        QoS qoS;
553        if (message.propertyExists(QOS_PROPERTY_NAME)) {
554            int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
555            qoS = QoS.values()[ordinal];
556
557        } else {
558            qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
559        }
560        result.qos(qoS);
561        if (message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)) {
562            result.retain(true);
563        }
564
565        String topicName;
566        synchronized (mqttTopicMap) {
567            ActiveMQDestination destination = message.getDestination();
568            if (destination.isPattern() && message.getOriginalDestination() != null) {
569                destination = message.getOriginalDestination();
570            }
571            topicName = mqttTopicMap.get(destination);
572            if (topicName == null) {
573                String amqTopicName = findSubscriptionStrategy().onSend(destination);
574                topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName);
575                mqttTopicMap.put(destination, topicName);
576            }
577        }
578        result.topicName(new UTF8Buffer(topicName));
579
580        if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
581            ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
582            msg.setReadOnlyBody(true);
583            String messageText = msg.getText();
584            if (messageText != null) {
585                result.payload(new Buffer(messageText.getBytes("UTF-8")));
586            }
587        } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
588            ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
589            msg.setReadOnlyBody(true);
590            byte[] data = new byte[(int) msg.getBodyLength()];
591            msg.readBytes(data);
592            result.payload(new Buffer(data));
593        } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
594            ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
595            msg.setReadOnlyBody(true);
596            Map<String, Object> map = msg.getContentMap();
597            if (map != null) {
598                result.payload(new Buffer(map.toString().getBytes("UTF-8")));
599            }
600        } else {
601            ByteSequence byteSequence = message.getContent();
602            if (byteSequence != null && byteSequence.getLength() > 0) {
603                if (message.isCompressed()) {
604                    Inflater inflater = new Inflater();
605                    inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
606                    byte[] data = new byte[4096];
607                    int read;
608                    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
609                    while ((read = inflater.inflate(data)) != 0) {
610                        bytesOut.write(data, 0, read);
611                    }
612                    byteSequence = bytesOut.toByteSequence();
613                    bytesOut.close();
614                }
615                result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length));
616            }
617        }
618        LOG.trace("ActiveMQ-->MQTT:MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}",
619                result.messageId(), clientId, connectionInfo.getConnectionId(), message.getMessageId());
620        return result;
621    }
622
623    public MQTTTransport getMQTTTransport() {
624        return mqttTransport;
625    }
626
627    boolean willSent = false;
628    public void onTransportError() {
629        if (connect != null) {
630            if (connected.get()) {
631                if (connect.willTopic() != null && connect.willMessage() != null && !willSent) {
632                    willSent = true;
633                    try {
634                        PUBLISH publish = new PUBLISH();
635                        publish.topicName(connect.willTopic());
636                        publish.qos(connect.willQos());
637                        publish.messageId(packetIdGenerator.getNextSequenceId(getClientId()));
638                        publish.payload(connect.willMessage());
639                        publish.retain(connect.willRetain());
640                        ActiveMQMessage message = convertMessage(publish);
641                        message.setProducerId(producerId);
642                        message.onSend();
643
644                        sendToActiveMQ(message, null);
645                    } catch (Exception e) {
646                        LOG.warn("Failed to publish Will Message " + connect.willMessage());
647                    }
648                }
649                // remove connection info
650                sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
651            }
652        }
653    }
654
655    void configureInactivityMonitor(short keepAliveSeconds) {
656        MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
657
658        // If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false,
659        // then ignore configuring it because it won't exist
660        if (monitor == null) {
661            return;
662        }
663
664        // Client has sent a valid CONNECT frame, we can stop the connect checker.
665        monitor.stopConnectChecker();
666
667        long keepAliveMS = keepAliveSeconds * 1000;
668
669        LOG.debug("MQTT Client {} requests heart beat of {} ms", getClientId(), keepAliveMS);
670
671        try {
672            // if we have a default keep-alive value, and the client is trying to turn off keep-alive,
673
674            // we'll observe the server-side configured default value (note, no grace period)
675            if (keepAliveMS == 0 && defaultKeepAlive > 0) {
676                keepAliveMS = defaultKeepAlive;
677            }
678
679            long readGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD);
680
681            monitor.setProtocolConverter(this);
682            monitor.setReadKeepAliveTime(keepAliveMS);
683            monitor.setReadGraceTime(readGracePeriod);
684            monitor.startReadChecker();
685
686            LOG.debug("MQTT Client {} established heart beat of  {} ms ({} ms + {} ms grace period)",
687                      new Object[] { getClientId(), keepAliveMS, keepAliveMS, readGracePeriod });
688        } catch (Exception ex) {
689            LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
690        }
691    }
692
693    void handleException(Throwable exception, MQTTFrame command) {
694        LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
695        LOG.debug("Exception detail", exception);
696
697        if (connected.get() && connectionInfo != null) {
698            connected.set(false);
699            sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
700        }
701        stopTransport();
702    }
703
704    void checkConnected() throws MQTTProtocolException {
705        if (!connected.get()) {
706            throw new MQTTProtocolException("Not connected.");
707        }
708    }
709
710    private void stopTransport() {
711        try {
712            getMQTTTransport().stop();
713        } catch (Throwable e) {
714            LOG.debug("Failed to stop MQTT transport ", e);
715        }
716    }
717
718    ResponseHandler createResponseHandler(final PUBLISH command) {
719        if (command != null) {
720            return new ResponseHandler() {
721                @Override
722                public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
723                    if (response.isException()) {
724                        Throwable error = ((ExceptionResponse) response).getException();
725                        LOG.warn("Failed to send MQTT Publish: ", command, error.getMessage());
726                        LOG.trace("Error trace: {}", error);
727                    }
728
729                    switch (command.qos()) {
730                        case AT_LEAST_ONCE:
731                            PUBACK ack = new PUBACK();
732                            ack.messageId(command.messageId());
733                            LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}",
734                                      command.messageId(), clientId, connectionInfo.getConnectionId());
735                            converter.getMQTTTransport().sendToMQTT(ack.encode());
736                            break;
737                        case EXACTLY_ONCE:
738                            PUBREC req = new PUBREC();
739                            req.messageId(command.messageId());
740                            synchronized (publisherRecs) {
741                                publisherRecs.put(command.messageId(), req);
742                            }
743                            LOG.trace("MQTT Snd PUBREC message:{} client:{} connection:{}",
744                                      command.messageId(), clientId, connectionInfo.getConnectionId());
745                            converter.getMQTTTransport().sendToMQTT(req.encode());
746                            break;
747                        default:
748                            break;
749                    }
750                }
751            };
752        }
753        return null;
754    }
755
756    public long getDefaultKeepAlive() {
757        return defaultKeepAlive;
758    }
759
760    /**
761     * Set the default keep alive time (in milliseconds) that would be used if configured on server side
762     * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
763     * @param keepAlive the keepAlive in milliseconds
764     */
765    public void setDefaultKeepAlive(long keepAlive) {
766        this.defaultKeepAlive = keepAlive;
767    }
768
769    public int getActiveMQSubscriptionPrefetch() {
770        return activeMQSubscriptionPrefetch;
771    }
772
773    /**
774     * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
775     * The default = 1
776     *
777     * @param activeMQSubscriptionPrefetch
778     *        set the prefetch for the corresponding ActiveMQ subscription
779     */
780    public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
781        this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch;
782    }
783
784    public MQTTPacketIdGenerator getPacketIdGenerator() {
785        return packetIdGenerator;
786    }
787
788    public void setPublishDollarTopics(boolean publishDollarTopics) {
789        this.publishDollarTopics = publishDollarTopics;
790    }
791
792    public boolean getPublishDollarTopics() {
793        return publishDollarTopics;
794    }
795
796    public ConnectionId getConnectionId() {
797        return connectionId;
798    }
799
800    public SessionId getSessionId() {
801        return sessionId;
802    }
803
804    public boolean isCleanSession() {
805        return this.connect.cleanSession();
806    }
807
808    public String getSubscriptionStrategy() {
809        return subscriptionStrategyName;
810    }
811
812    public void setSubscriptionStrategy(String name) {
813        this.subscriptionStrategyName = name;
814    }
815
816    public String getClientId() {
817        if (clientId == null) {
818            if (connect != null && connect.clientId() != null) {
819                clientId = connect.clientId().toString();
820            } else {
821                clientId = "";
822            }
823        }
824        return clientId;
825    }
826
827    protected boolean containsMqttWildcard(String value) {
828        return value != null && (value.contains(SINGLE_LEVEL_WILDCARD) ||
829                value.contains(MULTI_LEVEL_WILDCARD));
830    }
831
832    protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException {
833        if (subsciptionStrategy == null) {
834            synchronized (STRATAGY_FINDER) {
835                if (subsciptionStrategy != null) {
836                    return subsciptionStrategy;
837                }
838
839                MQTTSubscriptionStrategy strategy = null;
840                if (subscriptionStrategyName != null && !subscriptionStrategyName.isEmpty()) {
841                    try {
842                        strategy = (MQTTSubscriptionStrategy) STRATAGY_FINDER.newInstance(subscriptionStrategyName);
843                        LOG.debug("MQTT Using subscription strategy: {}", subscriptionStrategyName);
844                        if (strategy instanceof BrokerServiceAware) {
845                            ((BrokerServiceAware)strategy).setBrokerService(brokerService);
846                        }
847                        strategy.initialize(this);
848                    } catch (Exception e) {
849                        throw IOExceptionSupport.create(e);
850                    }
851                } else {
852                    throw new IOException("Invalid subscription strategy name given: " + subscriptionStrategyName);
853                }
854
855                this.subsciptionStrategy = strategy;
856            }
857        }
858        return subsciptionStrategy;
859    }
860}