001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt; 018 019import java.io.IOException; 020import java.util.Map; 021import java.util.concurrent.ConcurrentHashMap; 022import java.util.concurrent.ConcurrentMap; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.zip.DataFormatException; 025import java.util.zip.Inflater; 026 027import javax.jms.Destination; 028import javax.jms.InvalidClientIDException; 029import javax.jms.JMSException; 030import javax.jms.Message; 031import javax.security.auth.login.CredentialException; 032 033import org.apache.activemq.broker.BrokerService; 034import org.apache.activemq.broker.BrokerServiceAware; 035import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; 036import org.apache.activemq.command.ActiveMQBytesMessage; 037import org.apache.activemq.command.ActiveMQDestination; 038import org.apache.activemq.command.ActiveMQMapMessage; 039import org.apache.activemq.command.ActiveMQMessage; 040import org.apache.activemq.command.ActiveMQTextMessage; 041import org.apache.activemq.command.Command; 042import org.apache.activemq.command.ConnectionError; 043import org.apache.activemq.command.ConnectionId; 044import org.apache.activemq.command.ConnectionInfo; 045import org.apache.activemq.command.ExceptionResponse; 046import org.apache.activemq.command.MessageAck; 047import org.apache.activemq.command.MessageDispatch; 048import org.apache.activemq.command.MessageId; 049import org.apache.activemq.command.ProducerId; 050import org.apache.activemq.command.ProducerInfo; 051import org.apache.activemq.command.Response; 052import org.apache.activemq.command.SessionId; 053import org.apache.activemq.command.SessionInfo; 054import org.apache.activemq.command.ShutdownInfo; 055import org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy; 056import org.apache.activemq.util.ByteArrayOutputStream; 057import org.apache.activemq.util.ByteSequence; 058import org.apache.activemq.util.FactoryFinder; 059import org.apache.activemq.util.IOExceptionSupport; 060import org.apache.activemq.util.IdGenerator; 061import org.apache.activemq.util.JMSExceptionSupport; 062import org.apache.activemq.util.LRUCache; 063import org.apache.activemq.util.LongSequenceGenerator; 064import org.fusesource.hawtbuf.Buffer; 065import org.fusesource.hawtbuf.UTF8Buffer; 066import org.fusesource.mqtt.client.QoS; 067import org.fusesource.mqtt.client.Topic; 068import org.fusesource.mqtt.codec.CONNACK; 069import org.fusesource.mqtt.codec.CONNECT; 070import org.fusesource.mqtt.codec.DISCONNECT; 071import org.fusesource.mqtt.codec.MQTTFrame; 072import org.fusesource.mqtt.codec.PINGREQ; 073import org.fusesource.mqtt.codec.PINGRESP; 074import org.fusesource.mqtt.codec.PUBACK; 075import org.fusesource.mqtt.codec.PUBCOMP; 076import org.fusesource.mqtt.codec.PUBLISH; 077import org.fusesource.mqtt.codec.PUBREC; 078import org.fusesource.mqtt.codec.PUBREL; 079import org.fusesource.mqtt.codec.SUBACK; 080import org.fusesource.mqtt.codec.SUBSCRIBE; 081import org.fusesource.mqtt.codec.UNSUBACK; 082import org.fusesource.mqtt.codec.UNSUBSCRIBE; 083import org.slf4j.Logger; 084import org.slf4j.LoggerFactory; 085 086public class MQTTProtocolConverter { 087 088 private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class); 089 090 public static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS"; 091 public static final int V3_1 = 3; 092 public static final int V3_1_1 = 4; 093 094 public static final String SINGLE_LEVEL_WILDCARD = "+"; 095 public static final String MULTI_LEVEL_WILDCARD = "#"; 096 097 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 098 private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); 099 private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5; 100 static final int DEFAULT_CACHE_SIZE = 5000; 101 102 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 103 private final SessionId sessionId = new SessionId(connectionId, -1); 104 private final ProducerId producerId = new ProducerId(sessionId, 1); 105 private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator(); 106 107 private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); 108 private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE); 109 private final Map<ActiveMQDestination, String> mqttTopicMap = new LRUCache<ActiveMQDestination, String>(DEFAULT_CACHE_SIZE); 110 111 private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE); 112 private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE); 113 114 private final MQTTTransport mqttTransport; 115 private final BrokerService brokerService; 116 117 private final Object commnadIdMutex = new Object(); 118 private int lastCommandId; 119 private final AtomicBoolean connected = new AtomicBoolean(false); 120 private final ConnectionInfo connectionInfo = new ConnectionInfo(); 121 private CONNECT connect; 122 private String clientId; 123 private long defaultKeepAlive; 124 private int activeMQSubscriptionPrefetch = -1; 125 private final MQTTPacketIdGenerator packetIdGenerator; 126 private boolean publishDollarTopics; 127 128 public int version; 129 130 private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/"); 131 132 /* 133 * Subscription strategy configuration element. 134 * > mqtt-default-subscriptions 135 * > mqtt-virtual-topic-subscriptions 136 */ 137 private String subscriptionStrategyName = "mqtt-default-subscriptions"; 138 private MQTTSubscriptionStrategy subsciptionStrategy; 139 140 public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) { 141 this.mqttTransport = mqttTransport; 142 this.brokerService = brokerService; 143 this.packetIdGenerator = MQTTPacketIdGenerator.getMQTTPacketIdGenerator(brokerService); 144 this.defaultKeepAlive = 0; 145 } 146 147 int generateCommandId() { 148 synchronized (commnadIdMutex) { 149 return lastCommandId++; 150 } 151 } 152 153 public void sendToActiveMQ(Command command, ResponseHandler handler) { 154 155 // Lets intercept message send requests.. 156 if (command instanceof ActiveMQMessage) { 157 ActiveMQMessage msg = (ActiveMQMessage) command; 158 try { 159 if (!getPublishDollarTopics() && findSubscriptionStrategy().isControlTopic(msg.getDestination())) { 160 // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1 161 // specification requirements for system assigned destinations. 162 if (handler != null) { 163 try { 164 handler.onResponse(this, new Response()); 165 } catch (IOException e) { 166 LOG.warn("Failed to send command " + command, e); 167 } 168 } 169 return; 170 } 171 } catch (IOException e) { 172 LOG.warn("Failed to send command " + command, e); 173 } 174 } 175 176 command.setCommandId(generateCommandId()); 177 if (handler != null) { 178 command.setResponseRequired(true); 179 resposeHandlers.put(command.getCommandId(), handler); 180 } 181 getMQTTTransport().sendToActiveMQ(command); 182 } 183 184 void sendToMQTT(MQTTFrame frame) { 185 try { 186 mqttTransport.sendToMQTT(frame); 187 } catch (IOException e) { 188 LOG.warn("Failed to send frame " + frame, e); 189 } 190 } 191 192 /** 193 * Convert a MQTT command 194 */ 195 public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException { 196 switch (frame.messageType()) { 197 case PINGREQ.TYPE: 198 LOG.debug("Received a ping from client: " + getClientId()); 199 sendToMQTT(PING_RESP_FRAME); 200 LOG.debug("Sent Ping Response to " + getClientId()); 201 break; 202 case CONNECT.TYPE: 203 CONNECT connect = new CONNECT().decode(frame); 204 onMQTTConnect(connect); 205 LOG.debug("MQTT Client {} connected. (version: {})", getClientId(), connect.version()); 206 break; 207 case DISCONNECT.TYPE: 208 LOG.debug("MQTT Client {} disconnecting", getClientId()); 209 onMQTTDisconnect(); 210 break; 211 case SUBSCRIBE.TYPE: 212 onSubscribe(new SUBSCRIBE().decode(frame)); 213 break; 214 case UNSUBSCRIBE.TYPE: 215 onUnSubscribe(new UNSUBSCRIBE().decode(frame)); 216 break; 217 case PUBLISH.TYPE: 218 onMQTTPublish(new PUBLISH().decode(frame)); 219 break; 220 case PUBACK.TYPE: 221 onMQTTPubAck(new PUBACK().decode(frame)); 222 break; 223 case PUBREC.TYPE: 224 onMQTTPubRec(new PUBREC().decode(frame)); 225 break; 226 case PUBREL.TYPE: 227 onMQTTPubRel(new PUBREL().decode(frame)); 228 break; 229 case PUBCOMP.TYPE: 230 onMQTTPubComp(new PUBCOMP().decode(frame)); 231 break; 232 default: 233 handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame); 234 } 235 } 236 237 void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException { 238 if (connected.get()) { 239 throw new MQTTProtocolException("Already connected."); 240 } 241 this.connect = connect; 242 243 // The Server MUST respond to the CONNECT Packet with a CONNACK return code 0x01 244 // (unacceptable protocol level) and then disconnect the Client if the Protocol Level 245 // is not supported by the Server [MQTT-3.1.2-2]. 246 if (connect.version() < 3 || connect.version() > 4) { 247 CONNACK ack = new CONNACK(); 248 ack.code(CONNACK.Code.CONNECTION_REFUSED_UNACCEPTED_PROTOCOL_VERSION); 249 try { 250 getMQTTTransport().sendToMQTT(ack.encode()); 251 getMQTTTransport().onException(IOExceptionSupport.create("Unsupported or invalid protocol version", null)); 252 } catch (IOException e) { 253 getMQTTTransport().onException(IOExceptionSupport.create(e)); 254 } 255 return; 256 } 257 258 String clientId = ""; 259 if (connect.clientId() != null) { 260 clientId = connect.clientId().toString(); 261 } 262 263 String userName = null; 264 if (connect.userName() != null) { 265 userName = connect.userName().toString(); 266 } 267 String passswd = null; 268 if (connect.password() != null) { 269 passswd = connect.password().toString(); 270 } 271 272 version = connect.version(); 273 274 configureInactivityMonitor(connect.keepAlive()); 275 276 connectionInfo.setConnectionId(connectionId); 277 if (clientId != null && !clientId.isEmpty()) { 278 connectionInfo.setClientId(clientId); 279 } else { 280 // Clean Session MUST be set for 0 length Client Id 281 if (!connect.cleanSession()) { 282 CONNACK ack = new CONNACK(); 283 ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED); 284 try { 285 getMQTTTransport().sendToMQTT(ack.encode()); 286 getMQTTTransport().onException(IOExceptionSupport.create("Invalid Client ID", null)); 287 } catch (IOException e) { 288 getMQTTTransport().onException(IOExceptionSupport.create(e)); 289 } 290 return; 291 } 292 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); 293 } 294 295 connectionInfo.setResponseRequired(true); 296 connectionInfo.setUserName(userName); 297 connectionInfo.setPassword(passswd); 298 connectionInfo.setTransportContext(mqttTransport.getPeerCertificates()); 299 300 sendToActiveMQ(connectionInfo, new ResponseHandler() { 301 @Override 302 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 303 304 if (response.isException()) { 305 // If the connection attempt fails we close the socket. 306 Throwable exception = ((ExceptionResponse) response).getException(); 307 //let the client know 308 CONNACK ack = new CONNACK(); 309 if (exception instanceof InvalidClientIDException) { 310 ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED); 311 } else if (exception instanceof SecurityException) { 312 ack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED); 313 } else if (exception instanceof CredentialException) { 314 ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); 315 } else { 316 ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE); 317 } 318 getMQTTTransport().sendToMQTT(ack.encode()); 319 getMQTTTransport().onException(IOExceptionSupport.create(exception)); 320 return; 321 } 322 323 final SessionInfo sessionInfo = new SessionInfo(sessionId); 324 sendToActiveMQ(sessionInfo, null); 325 326 final ProducerInfo producerInfo = new ProducerInfo(producerId); 327 sendToActiveMQ(producerInfo, new ResponseHandler() { 328 @Override 329 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 330 331 if (response.isException()) { 332 // If the connection attempt fails we close the socket. 333 Throwable exception = ((ExceptionResponse) response).getException(); 334 CONNACK ack = new CONNACK(); 335 ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); 336 getMQTTTransport().sendToMQTT(ack.encode()); 337 getMQTTTransport().onException(IOExceptionSupport.create(exception)); 338 return; 339 } 340 341 CONNACK ack = new CONNACK(); 342 ack.code(CONNACK.Code.CONNECTION_ACCEPTED); 343 connected.set(true); 344 getMQTTTransport().sendToMQTT(ack.encode()); 345 346 if (connect.cleanSession()) { 347 packetIdGenerator.stopClientSession(getClientId()); 348 } else { 349 packetIdGenerator.startClientSession(getClientId()); 350 } 351 352 findSubscriptionStrategy().onConnect(connect); 353 } 354 }); 355 } 356 }); 357 } 358 359 void onMQTTDisconnect() throws MQTTProtocolException { 360 if (connected.get()) { 361 connected.set(false); 362 sendToActiveMQ(connectionInfo.createRemoveCommand(), null); 363 sendToActiveMQ(new ShutdownInfo(), null); 364 } 365 stopTransport(); 366 } 367 368 void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException { 369 checkConnected(); 370 LOG.trace("MQTT SUBSCRIBE message:{} client:{} connection:{}", 371 command.messageId(), clientId, connectionInfo.getConnectionId()); 372 Topic[] topics = command.topics(); 373 if (topics != null) { 374 byte[] qos = new byte[topics.length]; 375 for (int i = 0; i < topics.length; i++) { 376 try { 377 qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]); 378 } catch (IOException e) { 379 throw new MQTTProtocolException("Failed to process subscription request", true, e); 380 } 381 } 382 SUBACK ack = new SUBACK(); 383 ack.messageId(command.messageId()); 384 ack.grantedQos(qos); 385 try { 386 getMQTTTransport().sendToMQTT(ack.encode()); 387 } catch (IOException e) { 388 LOG.warn("Couldn't send SUBACK for " + command, e); 389 } 390 } else { 391 LOG.warn("No topics defined for Subscription " + command); 392 } 393 } 394 395 public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException { 396 checkConnected(); 397 if (command.qos() != QoS.AT_LEAST_ONCE && (version != V3_1 || publishDollarTopics != true)) { 398 throw new MQTTProtocolException("Failed to process unsubscribe request", true, new Exception("UNSUBSCRIBE frame not properly formatted, QoS")); 399 } 400 UTF8Buffer[] topics = command.topics(); 401 if (topics != null) { 402 for (UTF8Buffer topic : topics) { 403 try { 404 findSubscriptionStrategy().onUnSubscribe(topic.toString()); 405 } catch (IOException e) { 406 throw new MQTTProtocolException("Failed to process unsubscribe request", true, e); 407 } 408 } 409 } 410 UNSUBACK ack = new UNSUBACK(); 411 ack.messageId(command.messageId()); 412 sendToMQTT(ack.encode()); 413 } 414 415 /** 416 * Dispatch an ActiveMQ command 417 */ 418 public void onActiveMQCommand(Command command) throws Exception { 419 if (command.isResponse()) { 420 Response response = (Response) command; 421 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); 422 if (rh != null) { 423 rh.onResponse(this, response); 424 } else { 425 // Pass down any unexpected errors. Should this close the connection? 426 if (response.isException()) { 427 Throwable exception = ((ExceptionResponse) response).getException(); 428 handleException(exception, null); 429 } 430 } 431 } else if (command.isMessageDispatch()) { 432 MessageDispatch md = (MessageDispatch) command; 433 MQTTSubscription sub = findSubscriptionStrategy().getSubscription(md.getConsumerId()); 434 if (sub != null) { 435 MessageAck ack = sub.createMessageAck(md); 436 PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage()); 437 switch (publish.qos()) { 438 case AT_LEAST_ONCE: 439 case EXACTLY_ONCE: 440 publish.dup(publish.dup() ? true : md.getMessage().isRedelivered()); 441 case AT_MOST_ONCE: 442 } 443 if (ack != null && sub.expectAck(publish)) { 444 synchronized (consumerAcks) { 445 consumerAcks.put(publish.messageId(), ack); 446 } 447 } 448 LOG.trace("MQTT Snd PUBLISH message:{} client:{} connection:{}", 449 publish.messageId(), clientId, connectionInfo.getConnectionId()); 450 getMQTTTransport().sendToMQTT(publish.encode()); 451 if (ack != null && !sub.expectAck(publish)) { 452 getMQTTTransport().sendToActiveMQ(ack); 453 } 454 } 455 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { 456 // Pass down any unexpected async errors. Should this close the connection? 457 Throwable exception = ((ConnectionError) command).getException(); 458 handleException(exception, null); 459 } else if (command.isBrokerInfo()) { 460 //ignore 461 } else { 462 LOG.debug("Do not know how to process ActiveMQ Command {}", command); 463 } 464 } 465 466 void onMQTTPublish(PUBLISH command) throws IOException, JMSException { 467 checkConnected(); 468 LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}", 469 command.messageId(), clientId, connectionInfo.getConnectionId()); 470 //Both version 3.1 and 3.1.1 do not allow the topic name to contain a wildcard in the publish packet 471 if (containsMqttWildcard(command.topicName().toString())) { 472 // [MQTT-3.3.2-2]: The Topic Name in the PUBLISH Packet MUST NOT contain wildcard characters 473 getMQTTTransport().onException(IOExceptionSupport.create("The topic name must not contain wildcard characters.", null)); 474 return; 475 } 476 ActiveMQMessage message = convertMessage(command); 477 message.setProducerId(producerId); 478 message.onSend(); 479 sendToActiveMQ(message, createResponseHandler(command)); 480 } 481 482 void onMQTTPubAck(PUBACK command) { 483 short messageId = command.messageId(); 484 LOG.trace("MQTT Rcv PUBACK message:{} client:{} connection:{}", 485 messageId, clientId, connectionInfo.getConnectionId()); 486 packetIdGenerator.ackPacketId(getClientId(), messageId); 487 MessageAck ack; 488 synchronized (consumerAcks) { 489 ack = consumerAcks.remove(messageId); 490 } 491 if (ack != null) { 492 getMQTTTransport().sendToActiveMQ(ack); 493 } 494 } 495 496 void onMQTTPubRec(PUBREC commnand) { 497 //from a subscriber - send a PUBREL in response 498 PUBREL pubrel = new PUBREL(); 499 pubrel.messageId(commnand.messageId()); 500 sendToMQTT(pubrel.encode()); 501 } 502 503 void onMQTTPubRel(PUBREL command) { 504 PUBREC ack; 505 synchronized (publisherRecs) { 506 ack = publisherRecs.remove(command.messageId()); 507 } 508 if (ack == null) { 509 LOG.warn("Unknown PUBREL: {} received", command.messageId()); 510 } 511 PUBCOMP pubcomp = new PUBCOMP(); 512 pubcomp.messageId(command.messageId()); 513 sendToMQTT(pubcomp.encode()); 514 } 515 516 void onMQTTPubComp(PUBCOMP command) { 517 short messageId = command.messageId(); 518 packetIdGenerator.ackPacketId(getClientId(), messageId); 519 MessageAck ack; 520 synchronized (consumerAcks) { 521 ack = consumerAcks.remove(messageId); 522 } 523 if (ack != null) { 524 getMQTTTransport().sendToActiveMQ(ack); 525 } 526 } 527 528 ActiveMQMessage convertMessage(PUBLISH command) throws JMSException { 529 ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); 530 531 msg.setProducerId(producerId); 532 MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId()); 533 msg.setMessageId(id); 534 LOG.trace("MQTT-->ActiveMQ: MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", 535 command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId()); 536 msg.setTimestamp(System.currentTimeMillis()); 537 msg.setPriority((byte) Message.DEFAULT_PRIORITY); 538 msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE); 539 msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal()); 540 if (command.retain()) { 541 msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true); 542 } 543 544 ActiveMQDestination destination; 545 synchronized (activeMQDestinationMap) { 546 destination = activeMQDestinationMap.get(command.topicName()); 547 if (destination == null) { 548 String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString()); 549 try { 550 destination = findSubscriptionStrategy().onSend(topicName); 551 } catch (IOException e) { 552 throw JMSExceptionSupport.create(e); 553 } 554 555 activeMQDestinationMap.put(command.topicName().toString(), destination); 556 } 557 } 558 559 msg.setJMSDestination(destination); 560 msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length); 561 return msg; 562 } 563 564 public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException { 565 PUBLISH result = new PUBLISH(); 566 // packet id is set in MQTTSubscription 567 QoS qoS; 568 if (message.propertyExists(QOS_PROPERTY_NAME)) { 569 int ordinal = message.getIntProperty(QOS_PROPERTY_NAME); 570 qoS = QoS.values()[ordinal]; 571 572 } else { 573 qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE; 574 } 575 result.qos(qoS); 576 if (message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)) { 577 result.retain(true); 578 } 579 580 String topicName; 581 synchronized (mqttTopicMap) { 582 ActiveMQDestination destination = message.getDestination(); 583 if (destination.isPattern() && message.getOriginalDestination() != null) { 584 destination = message.getOriginalDestination(); 585 } 586 topicName = mqttTopicMap.get(destination); 587 if (topicName == null) { 588 String amqTopicName = findSubscriptionStrategy().onSend(destination); 589 topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName); 590 mqttTopicMap.put(destination, topicName); 591 } 592 } 593 result.topicName(new UTF8Buffer(topicName)); 594 595 if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { 596 ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); 597 msg.setReadOnlyBody(true); 598 String messageText = msg.getText(); 599 if (messageText != null) { 600 result.payload(new Buffer(messageText.getBytes("UTF-8"))); 601 } 602 } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { 603 ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy(); 604 msg.setReadOnlyBody(true); 605 byte[] data = new byte[(int) msg.getBodyLength()]; 606 msg.readBytes(data); 607 result.payload(new Buffer(data)); 608 } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { 609 ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); 610 msg.setReadOnlyBody(true); 611 Map<String, Object> map = msg.getContentMap(); 612 if (map != null) { 613 result.payload(new Buffer(map.toString().getBytes("UTF-8"))); 614 } 615 } else { 616 ByteSequence byteSequence = message.getContent(); 617 if (byteSequence != null && byteSequence.getLength() > 0) { 618 if (message.isCompressed()) { 619 Inflater inflater = new Inflater(); 620 inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length); 621 byte[] data = new byte[4096]; 622 int read; 623 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 624 while ((read = inflater.inflate(data)) != 0) { 625 bytesOut.write(data, 0, read); 626 } 627 byteSequence = bytesOut.toByteSequence(); 628 bytesOut.close(); 629 } 630 result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length)); 631 } 632 } 633 LOG.trace("ActiveMQ-->MQTT:MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", 634 result.messageId(), clientId, connectionInfo.getConnectionId(), message.getMessageId()); 635 return result; 636 } 637 638 public MQTTTransport getMQTTTransport() { 639 return mqttTransport; 640 } 641 642 AtomicBoolean transportErrorHandled = new AtomicBoolean(false); 643 public void onTransportError() { 644 if (transportErrorHandled.compareAndSet(false, true)) { 645 if (connect != null) { 646 if (connected.get()) { 647 if (connect.willTopic() != null && connect.willMessage() != null) { 648 try { 649 PUBLISH publish = new PUBLISH(); 650 publish.topicName(connect.willTopic()); 651 publish.qos(connect.willQos()); 652 publish.messageId(packetIdGenerator.getNextSequenceId(getClientId())); 653 publish.payload(connect.willMessage()); 654 publish.retain(connect.willRetain()); 655 ActiveMQMessage message = convertMessage(publish); 656 message.setProducerId(producerId); 657 message.onSend(); 658 659 sendToActiveMQ(message, null); 660 } catch (Exception e) { 661 LOG.warn("Failed to publish Will Message " + connect.willMessage()); 662 } 663 } 664 // remove connection info 665 sendToActiveMQ(connectionInfo.createRemoveCommand(), null); 666 } 667 } 668 } 669 } 670 671 void configureInactivityMonitor(short keepAliveSeconds) { 672 MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor(); 673 674 // If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false, 675 // then ignore configuring it because it won't exist 676 if (monitor == null) { 677 return; 678 } 679 680 // Client has sent a valid CONNECT frame, we can stop the connect checker. 681 monitor.stopConnectChecker(); 682 683 long keepAliveMS = keepAliveSeconds * 1000; 684 685 LOG.debug("MQTT Client {} requests heart beat of {} ms", getClientId(), keepAliveMS); 686 687 try { 688 // if we have a default keep-alive value, and the client is trying to turn off keep-alive, 689 690 // we'll observe the server-side configured default value (note, no grace period) 691 if (keepAliveMS == 0 && defaultKeepAlive > 0) { 692 keepAliveMS = defaultKeepAlive; 693 } 694 695 long readGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD); 696 697 monitor.setProtocolConverter(this); 698 monitor.setReadKeepAliveTime(keepAliveMS); 699 monitor.setReadGraceTime(readGracePeriod); 700 monitor.startReadChecker(); 701 702 LOG.debug("MQTT Client {} established heart beat of {} ms ({} ms + {} ms grace period)", 703 new Object[] { getClientId(), keepAliveMS, keepAliveMS, readGracePeriod }); 704 } catch (Exception ex) { 705 LOG.warn("Failed to start MQTT InactivityMonitor ", ex); 706 } 707 } 708 709 void handleException(Throwable exception, MQTTFrame command) { 710 LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString()); 711 LOG.debug("Exception detail", exception); 712 713 if (connected.get() && connectionInfo != null) { 714 connected.set(false); 715 sendToActiveMQ(connectionInfo.createRemoveCommand(), null); 716 } 717 stopTransport(); 718 } 719 720 void checkConnected() throws MQTTProtocolException { 721 if (!connected.get()) { 722 throw new MQTTProtocolException("Not connected."); 723 } 724 } 725 726 private void stopTransport() { 727 try { 728 getMQTTTransport().stop(); 729 } catch (Throwable e) { 730 LOG.debug("Failed to stop MQTT transport ", e); 731 } 732 } 733 734 ResponseHandler createResponseHandler(final PUBLISH command) { 735 if (command != null) { 736 return new ResponseHandler() { 737 @Override 738 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 739 if (response.isException()) { 740 Throwable error = ((ExceptionResponse) response).getException(); 741 LOG.warn("Failed to send MQTT Publish: ", command, error.getMessage()); 742 LOG.trace("Error trace: {}", error); 743 } 744 745 switch (command.qos()) { 746 case AT_LEAST_ONCE: 747 PUBACK ack = new PUBACK(); 748 ack.messageId(command.messageId()); 749 LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", 750 command.messageId(), clientId, connectionInfo.getConnectionId()); 751 converter.getMQTTTransport().sendToMQTT(ack.encode()); 752 break; 753 case EXACTLY_ONCE: 754 PUBREC req = new PUBREC(); 755 req.messageId(command.messageId()); 756 synchronized (publisherRecs) { 757 publisherRecs.put(command.messageId(), req); 758 } 759 LOG.trace("MQTT Snd PUBREC message:{} client:{} connection:{}", 760 command.messageId(), clientId, connectionInfo.getConnectionId()); 761 converter.getMQTTTransport().sendToMQTT(req.encode()); 762 break; 763 default: 764 break; 765 } 766 } 767 }; 768 } 769 return null; 770 } 771 772 public long getDefaultKeepAlive() { 773 return defaultKeepAlive; 774 } 775 776 /** 777 * Set the default keep alive time (in milliseconds) that would be used if configured on server side 778 * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame 779 * @param keepAlive the keepAlive in milliseconds 780 */ 781 public void setDefaultKeepAlive(long keepAlive) { 782 this.defaultKeepAlive = keepAlive; 783 } 784 785 public int getActiveMQSubscriptionPrefetch() { 786 return activeMQSubscriptionPrefetch; 787 } 788 789 /** 790 * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one 791 * The default = 1 792 * 793 * @param activeMQSubscriptionPrefetch 794 * set the prefetch for the corresponding ActiveMQ subscription 795 */ 796 public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) { 797 this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch; 798 } 799 800 public MQTTPacketIdGenerator getPacketIdGenerator() { 801 return packetIdGenerator; 802 } 803 804 public void setPublishDollarTopics(boolean publishDollarTopics) { 805 this.publishDollarTopics = publishDollarTopics; 806 } 807 808 public boolean getPublishDollarTopics() { 809 return publishDollarTopics; 810 } 811 812 public ConnectionId getConnectionId() { 813 return connectionId; 814 } 815 816 public SessionId getSessionId() { 817 return sessionId; 818 } 819 820 public boolean isCleanSession() { 821 return this.connect.cleanSession(); 822 } 823 824 public String getSubscriptionStrategy() { 825 return subscriptionStrategyName; 826 } 827 828 public void setSubscriptionStrategy(String name) { 829 this.subscriptionStrategyName = name; 830 } 831 832 public String getClientId() { 833 if (clientId == null) { 834 if (connect != null && connect.clientId() != null) { 835 clientId = connect.clientId().toString(); 836 } else { 837 clientId = ""; 838 } 839 } 840 return clientId; 841 } 842 843 protected boolean containsMqttWildcard(String value) { 844 return value != null && (value.contains(SINGLE_LEVEL_WILDCARD) || 845 value.contains(MULTI_LEVEL_WILDCARD)); 846 } 847 848 protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException { 849 if (subsciptionStrategy == null) { 850 synchronized (STRATAGY_FINDER) { 851 if (subsciptionStrategy != null) { 852 return subsciptionStrategy; 853 } 854 855 MQTTSubscriptionStrategy strategy = null; 856 if (subscriptionStrategyName != null && !subscriptionStrategyName.isEmpty()) { 857 try { 858 strategy = (MQTTSubscriptionStrategy) STRATAGY_FINDER.newInstance(subscriptionStrategyName); 859 LOG.debug("MQTT Using subscription strategy: {}", subscriptionStrategyName); 860 if (strategy instanceof BrokerServiceAware) { 861 ((BrokerServiceAware)strategy).setBrokerService(brokerService); 862 } 863 strategy.initialize(this); 864 } catch (Exception e) { 865 throw IOExceptionSupport.create(e); 866 } 867 } else { 868 throw new IOException("Invalid subscription strategy name given: " + subscriptionStrategyName); 869 } 870 871 this.subsciptionStrategy = strategy; 872 } 873 } 874 return subsciptionStrategy; 875 } 876 877 // for testing 878 public void setSubsciptionStrategy(MQTTSubscriptionStrategy subsciptionStrategy) { 879 this.subsciptionStrategy = subsciptionStrategy; 880 } 881}