001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt;
018
019import java.io.IOException;
020import java.util.Map;
021import java.util.concurrent.ConcurrentHashMap;
022import java.util.concurrent.ConcurrentMap;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.zip.DataFormatException;
025import java.util.zip.Inflater;
026
027import javax.jms.Destination;
028import javax.jms.InvalidClientIDException;
029import javax.jms.JMSException;
030import javax.jms.Message;
031import javax.security.auth.login.CredentialException;
032
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.BrokerServiceAware;
035import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
036import org.apache.activemq.command.ActiveMQBytesMessage;
037import org.apache.activemq.command.ActiveMQDestination;
038import org.apache.activemq.command.ActiveMQMapMessage;
039import org.apache.activemq.command.ActiveMQMessage;
040import org.apache.activemq.command.ActiveMQTextMessage;
041import org.apache.activemq.command.Command;
042import org.apache.activemq.command.ConnectionError;
043import org.apache.activemq.command.ConnectionId;
044import org.apache.activemq.command.ConnectionInfo;
045import org.apache.activemq.command.ExceptionResponse;
046import org.apache.activemq.command.MessageAck;
047import org.apache.activemq.command.MessageDispatch;
048import org.apache.activemq.command.MessageId;
049import org.apache.activemq.command.ProducerId;
050import org.apache.activemq.command.ProducerInfo;
051import org.apache.activemq.command.Response;
052import org.apache.activemq.command.SessionId;
053import org.apache.activemq.command.SessionInfo;
054import org.apache.activemq.command.ShutdownInfo;
055import org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy;
056import org.apache.activemq.util.ByteArrayOutputStream;
057import org.apache.activemq.util.ByteSequence;
058import org.apache.activemq.util.FactoryFinder;
059import org.apache.activemq.util.IOExceptionSupport;
060import org.apache.activemq.util.IdGenerator;
061import org.apache.activemq.util.JMSExceptionSupport;
062import org.apache.activemq.util.LRUCache;
063import org.apache.activemq.util.LongSequenceGenerator;
064import org.fusesource.hawtbuf.Buffer;
065import org.fusesource.hawtbuf.UTF8Buffer;
066import org.fusesource.mqtt.client.QoS;
067import org.fusesource.mqtt.client.Topic;
068import org.fusesource.mqtt.codec.CONNACK;
069import org.fusesource.mqtt.codec.CONNECT;
070import org.fusesource.mqtt.codec.DISCONNECT;
071import org.fusesource.mqtt.codec.MQTTFrame;
072import org.fusesource.mqtt.codec.PINGREQ;
073import org.fusesource.mqtt.codec.PINGRESP;
074import org.fusesource.mqtt.codec.PUBACK;
075import org.fusesource.mqtt.codec.PUBCOMP;
076import org.fusesource.mqtt.codec.PUBLISH;
077import org.fusesource.mqtt.codec.PUBREC;
078import org.fusesource.mqtt.codec.PUBREL;
079import org.fusesource.mqtt.codec.SUBACK;
080import org.fusesource.mqtt.codec.SUBSCRIBE;
081import org.fusesource.mqtt.codec.UNSUBACK;
082import org.fusesource.mqtt.codec.UNSUBSCRIBE;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086public class MQTTProtocolConverter {
087
088    private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class);
089
090    public static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
091    public static final int V3_1 = 3;
092    public static final int V3_1_1 = 4;
093
094    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
095    private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
096    private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5;
097    static final int DEFAULT_CACHE_SIZE = 5000;
098
099    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
100    private final SessionId sessionId = new SessionId(connectionId, -1);
101    private final ProducerId producerId = new ProducerId(sessionId, 1);
102    private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator();
103
104    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
105    private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE);
106    private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE);
107
108    private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
109    private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
110
111    private final MQTTTransport mqttTransport;
112    private final BrokerService brokerService;
113
114    private final Object commnadIdMutex = new Object();
115    private int lastCommandId;
116    private final AtomicBoolean connected = new AtomicBoolean(false);
117    private final ConnectionInfo connectionInfo = new ConnectionInfo();
118    private CONNECT connect;
119    private String clientId;
120    private long defaultKeepAlive;
121    private int activeMQSubscriptionPrefetch = -1;
122    private final MQTTPacketIdGenerator packetIdGenerator;
123    private boolean publishDollarTopics;
124
125    public int version;
126
127    private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/");
128
129    /*
130     * Subscription strategy configuration element.
131     *   > mqtt-default-subscriptions
132     *   > mqtt-virtual-topic-subscriptions
133     */
134    private String subscriptionStrategyName = "mqtt-default-subscriptions";
135    private MQTTSubscriptionStrategy subsciptionStrategy;
136
137    public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
138        this.mqttTransport = mqttTransport;
139        this.brokerService = brokerService;
140        this.packetIdGenerator = MQTTPacketIdGenerator.getMQTTPacketIdGenerator(brokerService);
141        this.defaultKeepAlive = 0;
142    }
143
144    int generateCommandId() {
145        synchronized (commnadIdMutex) {
146            return lastCommandId++;
147        }
148    }
149
150    public void sendToActiveMQ(Command command, ResponseHandler handler) {
151
152        // Lets intercept message send requests..
153        if (command instanceof ActiveMQMessage) {
154            ActiveMQMessage msg = (ActiveMQMessage) command;
155            try {
156                if (!getPublishDollarTopics() && findSubscriptionStrategy().isControlTopic(msg.getDestination())) {
157                    // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1
158                    // specification requirements for system assigned destinations.
159                    if (handler != null) {
160                        try {
161                            handler.onResponse(this, new Response());
162                        } catch (IOException e) {
163                            e.printStackTrace();
164                        }
165                    }
166                    return;
167                }
168            } catch (IOException e) {
169                e.printStackTrace();
170            }
171        }
172
173        command.setCommandId(generateCommandId());
174        if (handler != null) {
175            command.setResponseRequired(true);
176            resposeHandlers.put(command.getCommandId(), handler);
177        }
178        getMQTTTransport().sendToActiveMQ(command);
179    }
180
181    void sendToMQTT(MQTTFrame frame) {
182        try {
183            mqttTransport.sendToMQTT(frame);
184        } catch (IOException e) {
185            LOG.warn("Failed to send frame " + frame, e);
186        }
187    }
188
189    /**
190     * Convert a MQTT command
191     */
192    public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
193        switch (frame.messageType()) {
194            case PINGREQ.TYPE:
195                LOG.debug("Received a ping from client: " + getClientId());
196                sendToMQTT(PING_RESP_FRAME);
197                LOG.debug("Sent Ping Response to " + getClientId());
198                break;
199            case CONNECT.TYPE:
200                CONNECT connect = new CONNECT().decode(frame);
201                onMQTTConnect(connect);
202                LOG.debug("MQTT Client {} connected. (version: {})", getClientId(), connect.version());
203                break;
204            case DISCONNECT.TYPE:
205                LOG.debug("MQTT Client {} disconnecting", getClientId());
206                onMQTTDisconnect();
207                break;
208            case SUBSCRIBE.TYPE:
209                onSubscribe(new SUBSCRIBE().decode(frame));
210                break;
211            case UNSUBSCRIBE.TYPE:
212                onUnSubscribe(new UNSUBSCRIBE().decode(frame));
213                break;
214            case PUBLISH.TYPE:
215                onMQTTPublish(new PUBLISH().decode(frame));
216                break;
217            case PUBACK.TYPE:
218                onMQTTPubAck(new PUBACK().decode(frame));
219                break;
220            case PUBREC.TYPE:
221                onMQTTPubRec(new PUBREC().decode(frame));
222                break;
223            case PUBREL.TYPE:
224                onMQTTPubRel(new PUBREL().decode(frame));
225                break;
226            case PUBCOMP.TYPE:
227                onMQTTPubComp(new PUBCOMP().decode(frame));
228                break;
229            default:
230                handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
231        }
232    }
233
234    void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
235        if (connected.get()) {
236            throw new MQTTProtocolException("Already connected.");
237        }
238        this.connect = connect;
239
240        String clientId = "";
241        if (connect.clientId() != null) {
242            clientId = connect.clientId().toString();
243        }
244
245        String userName = null;
246        if (connect.userName() != null) {
247            userName = connect.userName().toString();
248        }
249        String passswd = null;
250        if (connect.password() != null) {
251            passswd = connect.password().toString();
252        }
253
254        version = connect.version();
255
256        configureInactivityMonitor(connect.keepAlive());
257
258        connectionInfo.setConnectionId(connectionId);
259        if (clientId != null && !clientId.isEmpty()) {
260            connectionInfo.setClientId(clientId);
261        } else {
262            // Clean Session MUST be set for 0 length Client Id
263            if (!connect.cleanSession()) {
264                CONNACK ack = new CONNACK();
265                ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
266                try {
267                    getMQTTTransport().sendToMQTT(ack.encode());
268                    getMQTTTransport().onException(IOExceptionSupport.create("Invalid Client ID", null));
269                } catch (IOException e) {
270                    getMQTTTransport().onException(IOExceptionSupport.create(e));
271                }
272                return;
273            }
274            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
275        }
276
277        connectionInfo.setResponseRequired(true);
278        connectionInfo.setUserName(userName);
279        connectionInfo.setPassword(passswd);
280        connectionInfo.setTransportContext(mqttTransport.getPeerCertificates());
281
282        sendToActiveMQ(connectionInfo, new ResponseHandler() {
283            @Override
284            public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
285
286                if (response.isException()) {
287                    // If the connection attempt fails we close the socket.
288                    Throwable exception = ((ExceptionResponse) response).getException();
289                    //let the client know
290                    CONNACK ack = new CONNACK();
291                    if (exception instanceof InvalidClientIDException) {
292                        ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
293                    } else if (exception instanceof SecurityException) {
294                        ack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED);
295                    } else if (exception instanceof CredentialException) {
296                        ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
297                    } else {
298                        ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
299                    }
300                    getMQTTTransport().sendToMQTT(ack.encode());
301                    getMQTTTransport().onException(IOExceptionSupport.create(exception));
302                    return;
303                }
304
305                final SessionInfo sessionInfo = new SessionInfo(sessionId);
306                sendToActiveMQ(sessionInfo, null);
307
308                final ProducerInfo producerInfo = new ProducerInfo(producerId);
309                sendToActiveMQ(producerInfo, new ResponseHandler() {
310                    @Override
311                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
312
313                        if (response.isException()) {
314                            // If the connection attempt fails we close the socket.
315                            Throwable exception = ((ExceptionResponse) response).getException();
316                            CONNACK ack = new CONNACK();
317                            ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
318                            getMQTTTransport().sendToMQTT(ack.encode());
319                            getMQTTTransport().onException(IOExceptionSupport.create(exception));
320                            return;
321                        }
322
323                        CONNACK ack = new CONNACK();
324                        ack.code(CONNACK.Code.CONNECTION_ACCEPTED);
325                        connected.set(true);
326                        getMQTTTransport().sendToMQTT(ack.encode());
327
328                        if (connect.cleanSession()) {
329                            packetIdGenerator.stopClientSession(getClientId());
330                        } else {
331                            packetIdGenerator.startClientSession(getClientId());
332                        }
333
334                        findSubscriptionStrategy().onConnect(connect);
335                    }
336                });
337            }
338        });
339    }
340
341    void onMQTTDisconnect() throws MQTTProtocolException {
342        if (connected.get()) {
343            connected.set(false);
344            sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
345            sendToActiveMQ(new ShutdownInfo(), null);
346        }
347        stopTransport();
348    }
349
350    void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
351        checkConnected();
352        LOG.trace("MQTT SUBSCRIBE message:{} client:{} connection:{}",
353                  command.messageId(), clientId, connectionInfo.getConnectionId());
354        Topic[] topics = command.topics();
355        if (topics != null) {
356            byte[] qos = new byte[topics.length];
357            for (int i = 0; i < topics.length; i++) {
358                try {
359                    qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]);
360                } catch (IOException e) {
361                    throw new MQTTProtocolException("Failed to process subscription request", true, e);
362                }
363            }
364            SUBACK ack = new SUBACK();
365            ack.messageId(command.messageId());
366            ack.grantedQos(qos);
367            try {
368                getMQTTTransport().sendToMQTT(ack.encode());
369            } catch (IOException e) {
370                LOG.warn("Couldn't send SUBACK for " + command, e);
371            }
372        } else {
373            LOG.warn("No topics defined for Subscription " + command);
374        }
375    }
376
377    public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException {
378        checkConnected();
379        if (command.qos() != QoS.AT_LEAST_ONCE && (version != V3_1 || publishDollarTopics != true)) {
380            throw new MQTTProtocolException("Failed to process unsubscribe request", true, new Exception("UNSUBSCRIBE frame not properly formatted, QoS"));
381        }
382        UTF8Buffer[] topics = command.topics();
383        if (topics != null) {
384            for (UTF8Buffer topic : topics) {
385                try {
386                    findSubscriptionStrategy().onUnSubscribe(topic.toString());
387                } catch (IOException e) {
388                    throw new MQTTProtocolException("Failed to process unsubscribe request", true, e);
389                }
390            }
391        }
392        UNSUBACK ack = new UNSUBACK();
393        ack.messageId(command.messageId());
394        sendToMQTT(ack.encode());
395    }
396
397    /**
398     * Dispatch an ActiveMQ command
399     */
400    public void onActiveMQCommand(Command command) throws Exception {
401        if (command.isResponse()) {
402            Response response = (Response) command;
403            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
404            if (rh != null) {
405                rh.onResponse(this, response);
406            } else {
407                // Pass down any unexpected errors. Should this close the connection?
408                if (response.isException()) {
409                    Throwable exception = ((ExceptionResponse) response).getException();
410                    handleException(exception, null);
411                }
412            }
413        } else if (command.isMessageDispatch()) {
414            MessageDispatch md = (MessageDispatch) command;
415            MQTTSubscription sub = findSubscriptionStrategy().getSubscription(md.getConsumerId());
416            if (sub != null) {
417                MessageAck ack = sub.createMessageAck(md);
418                PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
419                switch (publish.qos()) {
420                    case AT_LEAST_ONCE:
421                    case EXACTLY_ONCE:
422                        publish.dup(publish.dup() ? true : md.getMessage().isRedelivered());
423                    case AT_MOST_ONCE:
424                }
425                if (ack != null && sub.expectAck(publish)) {
426                    synchronized (consumerAcks) {
427                        consumerAcks.put(publish.messageId(), ack);
428                    }
429                }
430                LOG.trace("MQTT Snd PUBLISH message:{} client:{} connection:{}",
431                          publish.messageId(), clientId, connectionInfo.getConnectionId());
432                getMQTTTransport().sendToMQTT(publish.encode());
433                if (ack != null && !sub.expectAck(publish)) {
434                    getMQTTTransport().sendToActiveMQ(ack);
435                }
436            }
437        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
438            // Pass down any unexpected async errors. Should this close the connection?
439            Throwable exception = ((ConnectionError) command).getException();
440            handleException(exception, null);
441        } else if (command.isBrokerInfo()) {
442            //ignore
443        } else {
444            LOG.debug("Do not know how to process ActiveMQ Command {}", command);
445        }
446    }
447
448    void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
449        checkConnected();
450        LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}",
451                  command.messageId(), clientId, connectionInfo.getConnectionId());
452        ActiveMQMessage message = convertMessage(command);
453        message.setProducerId(producerId);
454        message.onSend();
455        sendToActiveMQ(message, createResponseHandler(command));
456    }
457
458    void onMQTTPubAck(PUBACK command) {
459        short messageId = command.messageId();
460        LOG.trace("MQTT Rcv PUBACK message:{} client:{} connection:{}",
461                  messageId, clientId, connectionInfo.getConnectionId());
462        packetIdGenerator.ackPacketId(getClientId(), messageId);
463        MessageAck ack;
464        synchronized (consumerAcks) {
465            ack = consumerAcks.remove(messageId);
466        }
467        if (ack != null) {
468            getMQTTTransport().sendToActiveMQ(ack);
469        }
470    }
471
472    void onMQTTPubRec(PUBREC commnand) {
473        //from a subscriber - send a PUBREL in response
474        PUBREL pubrel = new PUBREL();
475        pubrel.messageId(commnand.messageId());
476        sendToMQTT(pubrel.encode());
477    }
478
479    void onMQTTPubRel(PUBREL command) {
480        PUBREC ack;
481        synchronized (publisherRecs) {
482            ack = publisherRecs.remove(command.messageId());
483        }
484        if (ack == null) {
485            LOG.warn("Unknown PUBREL: {} received", command.messageId());
486        }
487        PUBCOMP pubcomp = new PUBCOMP();
488        pubcomp.messageId(command.messageId());
489        sendToMQTT(pubcomp.encode());
490    }
491
492    void onMQTTPubComp(PUBCOMP command) {
493        short messageId = command.messageId();
494        packetIdGenerator.ackPacketId(getClientId(), messageId);
495        MessageAck ack;
496        synchronized (consumerAcks) {
497            ack = consumerAcks.remove(messageId);
498        }
499        if (ack != null) {
500            getMQTTTransport().sendToActiveMQ(ack);
501        }
502    }
503
504    ActiveMQMessage convertMessage(PUBLISH command) throws JMSException {
505        ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
506
507        msg.setProducerId(producerId);
508        MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId());
509        msg.setMessageId(id);
510        LOG.trace("MQTT-->ActiveMQ: MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}",
511                command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId());
512        msg.setTimestamp(System.currentTimeMillis());
513        msg.setPriority((byte) Message.DEFAULT_PRIORITY);
514        msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain());
515        msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
516        if (command.retain()) {
517            msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
518        }
519
520        ActiveMQDestination destination;
521        synchronized (activeMQDestinationMap) {
522            destination = activeMQDestinationMap.get(command.topicName());
523            if (destination == null) {
524                String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
525                try {
526                    destination = findSubscriptionStrategy().onSend(topicName);
527                } catch (IOException e) {
528                    throw JMSExceptionSupport.create(e);
529                }
530
531                activeMQDestinationMap.put(command.topicName().toString(), destination);
532            }
533        }
534
535        msg.setJMSDestination(destination);
536        msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length);
537        return msg;
538    }
539
540    public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException {
541        PUBLISH result = new PUBLISH();
542        // packet id is set in MQTTSubscription
543        QoS qoS;
544        if (message.propertyExists(QOS_PROPERTY_NAME)) {
545            int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
546            qoS = QoS.values()[ordinal];
547
548        } else {
549            qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
550        }
551        result.qos(qoS);
552        if (message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)) {
553            result.retain(true);
554        }
555
556        String topicName;
557        synchronized (mqttTopicMap) {
558            topicName = mqttTopicMap.get(message.getJMSDestination());
559            if (topicName == null) {
560                String amqTopicName = findSubscriptionStrategy().onSend(message.getDestination());
561                topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName);
562                mqttTopicMap.put(message.getJMSDestination(), topicName);
563            }
564        }
565        result.topicName(new UTF8Buffer(topicName));
566
567        if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
568            ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
569            msg.setReadOnlyBody(true);
570            String messageText = msg.getText();
571            if (messageText != null) {
572                result.payload(new Buffer(messageText.getBytes("UTF-8")));
573            }
574        } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
575            ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
576            msg.setReadOnlyBody(true);
577            byte[] data = new byte[(int) msg.getBodyLength()];
578            msg.readBytes(data);
579            result.payload(new Buffer(data));
580        } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
581            ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
582            msg.setReadOnlyBody(true);
583            Map<String, Object> map = msg.getContentMap();
584            if (map != null) {
585                result.payload(new Buffer(map.toString().getBytes("UTF-8")));
586            }
587        } else {
588            ByteSequence byteSequence = message.getContent();
589            if (byteSequence != null && byteSequence.getLength() > 0) {
590                if (message.isCompressed()) {
591                    Inflater inflater = new Inflater();
592                    inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
593                    byte[] data = new byte[4096];
594                    int read;
595                    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
596                    while ((read = inflater.inflate(data)) != 0) {
597                        bytesOut.write(data, 0, read);
598                    }
599                    byteSequence = bytesOut.toByteSequence();
600                    bytesOut.close();
601                }
602                result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length));
603            }
604        }
605        LOG.trace("ActiveMQ-->MQTT:MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}",
606                result.messageId(), clientId, connectionInfo.getConnectionId(), message.getMessageId());
607        return result;
608    }
609
610    public MQTTTransport getMQTTTransport() {
611        return mqttTransport;
612    }
613
614    boolean willSent = false;
615    public void onTransportError() {
616        if (connect != null) {
617            if (connected.get()) {
618                if (connect.willTopic() != null && connect.willMessage() != null && !willSent) {
619                    willSent = true;
620                    try {
621                        PUBLISH publish = new PUBLISH();
622                        publish.topicName(connect.willTopic());
623                        publish.qos(connect.willQos());
624                        publish.messageId(packetIdGenerator.getNextSequenceId(getClientId()));
625                        publish.payload(connect.willMessage());
626                        publish.retain(connect.willRetain());
627                        ActiveMQMessage message = convertMessage(publish);
628                        message.setProducerId(producerId);
629                        message.onSend();
630
631                        sendToActiveMQ(message, null);
632                    } catch (Exception e) {
633                        LOG.warn("Failed to publish Will Message " + connect.willMessage());
634                    }
635                }
636                // remove connection info
637                sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
638            }
639        }
640    }
641
642    void configureInactivityMonitor(short keepAliveSeconds) {
643        MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
644
645        // If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false,
646        // then ignore configuring it because it won't exist
647        if (monitor == null) {
648            return;
649        }
650
651        // Client has sent a valid CONNECT frame, we can stop the connect checker.
652        monitor.stopConnectChecker();
653
654        long keepAliveMS = keepAliveSeconds * 1000;
655
656        LOG.debug("MQTT Client {} requests heart beat of {} ms", getClientId(), keepAliveMS);
657
658        try {
659            // if we have a default keep-alive value, and the client is trying to turn off keep-alive,
660
661            // we'll observe the server-side configured default value (note, no grace period)
662            if (keepAliveMS == 0 && defaultKeepAlive > 0) {
663                keepAliveMS = defaultKeepAlive;
664            }
665
666            long readGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD);
667
668            monitor.setProtocolConverter(this);
669            monitor.setReadKeepAliveTime(keepAliveMS);
670            monitor.setReadGraceTime(readGracePeriod);
671            monitor.startReadChecker();
672
673            LOG.debug("MQTT Client {} established heart beat of  {} ms ({} ms + {} ms grace period)",
674                      new Object[] { getClientId(), keepAliveMS, keepAliveMS, readGracePeriod });
675        } catch (Exception ex) {
676            LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
677        }
678    }
679
680    void handleException(Throwable exception, MQTTFrame command) {
681        LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
682        LOG.debug("Exception detail", exception);
683
684        if (connected.get() && connectionInfo != null) {
685            connected.set(false);
686            sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
687        }
688        stopTransport();
689    }
690
691    void checkConnected() throws MQTTProtocolException {
692        if (!connected.get()) {
693            throw new MQTTProtocolException("Not connected.");
694        }
695    }
696
697    private void stopTransport() {
698        try {
699            getMQTTTransport().stop();
700        } catch (Throwable e) {
701            LOG.debug("Failed to stop MQTT transport ", e);
702        }
703    }
704
705    ResponseHandler createResponseHandler(final PUBLISH command) {
706        if (command != null) {
707            return new ResponseHandler() {
708                @Override
709                public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
710                    if (response.isException()) {
711                        Throwable error = ((ExceptionResponse) response).getException();
712                        LOG.warn("Failed to send MQTT Publish: ", command, error.getMessage());
713                        LOG.trace("Error trace: {}", error);
714                    }
715
716                    switch (command.qos()) {
717                        case AT_LEAST_ONCE:
718                            PUBACK ack = new PUBACK();
719                            ack.messageId(command.messageId());
720                            LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}",
721                                      command.messageId(), clientId, connectionInfo.getConnectionId());
722                            converter.getMQTTTransport().sendToMQTT(ack.encode());
723                            break;
724                        case EXACTLY_ONCE:
725                            PUBREC req = new PUBREC();
726                            req.messageId(command.messageId());
727                            synchronized (publisherRecs) {
728                                publisherRecs.put(command.messageId(), req);
729                            }
730                            LOG.trace("MQTT Snd PUBREC message:{} client:{} connection:{}",
731                                      command.messageId(), clientId, connectionInfo.getConnectionId());
732                            converter.getMQTTTransport().sendToMQTT(req.encode());
733                            break;
734                        default:
735                            break;
736                    }
737                }
738            };
739        }
740        return null;
741    }
742
743    public long getDefaultKeepAlive() {
744        return defaultKeepAlive;
745    }
746
747    /**
748     * Set the default keep alive time (in milliseconds) that would be used if configured on server side
749     * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
750     * @param keepAlive the keepAlive in milliseconds
751     */
752    public void setDefaultKeepAlive(long keepAlive) {
753        this.defaultKeepAlive = keepAlive;
754    }
755
756    public int getActiveMQSubscriptionPrefetch() {
757        return activeMQSubscriptionPrefetch;
758    }
759
760    /**
761     * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
762     * The default = 1
763     *
764     * @param activeMQSubscriptionPrefetch
765     *        set the prefetch for the corresponding ActiveMQ subscription
766     */
767    public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
768        this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch;
769    }
770
771    public MQTTPacketIdGenerator getPacketIdGenerator() {
772        return packetIdGenerator;
773    }
774
775    public void setPublishDollarTopics(boolean publishDollarTopics) {
776        this.publishDollarTopics = publishDollarTopics;
777    }
778
779    public boolean getPublishDollarTopics() {
780        return publishDollarTopics;
781    }
782
783    public ConnectionId getConnectionId() {
784        return connectionId;
785    }
786
787    public SessionId getSessionId() {
788        return sessionId;
789    }
790
791    public boolean isCleanSession() {
792        return this.connect.cleanSession();
793    }
794
795    public String getSubscriptionStrategy() {
796        return subscriptionStrategyName;
797    }
798
799    public void setSubscriptionStrategy(String name) {
800        this.subscriptionStrategyName = name;
801    }
802
803    public String getClientId() {
804        if (clientId == null) {
805            if (connect != null && connect.clientId() != null) {
806                clientId = connect.clientId().toString();
807            } else {
808                clientId = "";
809            }
810        }
811        return clientId;
812    }
813
814    protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException {
815        if (subsciptionStrategy == null) {
816            synchronized (STRATAGY_FINDER) {
817                if (subsciptionStrategy != null) {
818                    return subsciptionStrategy;
819                }
820
821                MQTTSubscriptionStrategy strategy = null;
822                if (subscriptionStrategyName != null && !subscriptionStrategyName.isEmpty()) {
823                    try {
824                        strategy = (MQTTSubscriptionStrategy) STRATAGY_FINDER.newInstance(subscriptionStrategyName);
825                        LOG.debug("MQTT Using subscription strategy: {}", subscriptionStrategyName);
826                        if (strategy instanceof BrokerServiceAware) {
827                            ((BrokerServiceAware)strategy).setBrokerService(brokerService);
828                        }
829                        strategy.initialize(this);
830                    } catch (Exception e) {
831                        throw IOExceptionSupport.create(e);
832                    }
833                } else {
834                    throw new IOException("Invalid subscription strategy name given: " + subscriptionStrategyName);
835                }
836
837                this.subsciptionStrategy = strategy;
838            }
839        }
840        return subsciptionStrategy;
841    }
842}