001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt; 018 019import java.io.IOException; 020import java.util.Map; 021import java.util.concurrent.ConcurrentHashMap; 022import java.util.concurrent.ConcurrentMap; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.zip.DataFormatException; 025import java.util.zip.Inflater; 026 027import javax.jms.Destination; 028import javax.jms.InvalidClientIDException; 029import javax.jms.JMSException; 030import javax.jms.Message; 031import javax.security.auth.login.CredentialException; 032 033import org.apache.activemq.broker.BrokerService; 034import org.apache.activemq.broker.BrokerServiceAware; 035import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; 036import org.apache.activemq.command.ActiveMQBytesMessage; 037import org.apache.activemq.command.ActiveMQDestination; 038import org.apache.activemq.command.ActiveMQMapMessage; 039import org.apache.activemq.command.ActiveMQMessage; 040import org.apache.activemq.command.ActiveMQTextMessage; 041import org.apache.activemq.command.Command; 042import org.apache.activemq.command.ConnectionError; 043import org.apache.activemq.command.ConnectionId; 044import org.apache.activemq.command.ConnectionInfo; 045import org.apache.activemq.command.ExceptionResponse; 046import org.apache.activemq.command.MessageAck; 047import org.apache.activemq.command.MessageDispatch; 048import org.apache.activemq.command.MessageId; 049import org.apache.activemq.command.ProducerId; 050import org.apache.activemq.command.ProducerInfo; 051import org.apache.activemq.command.Response; 052import org.apache.activemq.command.SessionId; 053import org.apache.activemq.command.SessionInfo; 054import org.apache.activemq.command.ShutdownInfo; 055import org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy; 056import org.apache.activemq.util.ByteArrayOutputStream; 057import org.apache.activemq.util.ByteSequence; 058import org.apache.activemq.util.FactoryFinder; 059import org.apache.activemq.util.IOExceptionSupport; 060import org.apache.activemq.util.IdGenerator; 061import org.apache.activemq.util.JMSExceptionSupport; 062import org.apache.activemq.util.LRUCache; 063import org.apache.activemq.util.LongSequenceGenerator; 064import org.fusesource.hawtbuf.Buffer; 065import org.fusesource.hawtbuf.UTF8Buffer; 066import org.fusesource.mqtt.client.QoS; 067import org.fusesource.mqtt.client.Topic; 068import org.fusesource.mqtt.codec.CONNACK; 069import org.fusesource.mqtt.codec.CONNECT; 070import org.fusesource.mqtt.codec.DISCONNECT; 071import org.fusesource.mqtt.codec.MQTTFrame; 072import org.fusesource.mqtt.codec.PINGREQ; 073import org.fusesource.mqtt.codec.PINGRESP; 074import org.fusesource.mqtt.codec.PUBACK; 075import org.fusesource.mqtt.codec.PUBCOMP; 076import org.fusesource.mqtt.codec.PUBLISH; 077import org.fusesource.mqtt.codec.PUBREC; 078import org.fusesource.mqtt.codec.PUBREL; 079import org.fusesource.mqtt.codec.SUBACK; 080import org.fusesource.mqtt.codec.SUBSCRIBE; 081import org.fusesource.mqtt.codec.UNSUBACK; 082import org.fusesource.mqtt.codec.UNSUBSCRIBE; 083import org.slf4j.Logger; 084import org.slf4j.LoggerFactory; 085 086public class MQTTProtocolConverter { 087 088 private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class); 089 090 public static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS"; 091 public static final int V3_1 = 3; 092 public static final int V3_1_1 = 4; 093 094 public static final String SINGLE_LEVEL_WILDCARD = "+"; 095 public static final String MULTI_LEVEL_WILDCARD = "#"; 096 097 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 098 private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); 099 private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5; 100 static final int DEFAULT_CACHE_SIZE = 5000; 101 102 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 103 private final SessionId sessionId = new SessionId(connectionId, -1); 104 private final ProducerId producerId = new ProducerId(sessionId, 1); 105 private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator(); 106 107 private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); 108 private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE); 109 private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE); 110 111 private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE); 112 private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE); 113 114 private final MQTTTransport mqttTransport; 115 private final BrokerService brokerService; 116 117 private final Object commnadIdMutex = new Object(); 118 private int lastCommandId; 119 private final AtomicBoolean connected = new AtomicBoolean(false); 120 private final ConnectionInfo connectionInfo = new ConnectionInfo(); 121 private CONNECT connect; 122 private String clientId; 123 private long defaultKeepAlive; 124 private int activeMQSubscriptionPrefetch = -1; 125 private final MQTTPacketIdGenerator packetIdGenerator; 126 private boolean publishDollarTopics; 127 128 public int version; 129 130 private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/"); 131 132 /* 133 * Subscription strategy configuration element. 134 * > mqtt-default-subscriptions 135 * > mqtt-virtual-topic-subscriptions 136 */ 137 private String subscriptionStrategyName = "mqtt-default-subscriptions"; 138 private MQTTSubscriptionStrategy subsciptionStrategy; 139 140 public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) { 141 this.mqttTransport = mqttTransport; 142 this.brokerService = brokerService; 143 this.packetIdGenerator = MQTTPacketIdGenerator.getMQTTPacketIdGenerator(brokerService); 144 this.defaultKeepAlive = 0; 145 } 146 147 int generateCommandId() { 148 synchronized (commnadIdMutex) { 149 return lastCommandId++; 150 } 151 } 152 153 public void sendToActiveMQ(Command command, ResponseHandler handler) { 154 155 // Lets intercept message send requests.. 156 if (command instanceof ActiveMQMessage) { 157 ActiveMQMessage msg = (ActiveMQMessage) command; 158 try { 159 if (!getPublishDollarTopics() && findSubscriptionStrategy().isControlTopic(msg.getDestination())) { 160 // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1 161 // specification requirements for system assigned destinations. 162 if (handler != null) { 163 try { 164 handler.onResponse(this, new Response()); 165 } catch (IOException e) { 166 e.printStackTrace(); 167 } 168 } 169 return; 170 } 171 } catch (IOException e) { 172 e.printStackTrace(); 173 } 174 } 175 176 command.setCommandId(generateCommandId()); 177 if (handler != null) { 178 command.setResponseRequired(true); 179 resposeHandlers.put(command.getCommandId(), handler); 180 } 181 getMQTTTransport().sendToActiveMQ(command); 182 } 183 184 void sendToMQTT(MQTTFrame frame) { 185 try { 186 mqttTransport.sendToMQTT(frame); 187 } catch (IOException e) { 188 LOG.warn("Failed to send frame " + frame, e); 189 } 190 } 191 192 /** 193 * Convert a MQTT command 194 */ 195 public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException { 196 switch (frame.messageType()) { 197 case PINGREQ.TYPE: 198 LOG.debug("Received a ping from client: " + getClientId()); 199 sendToMQTT(PING_RESP_FRAME); 200 LOG.debug("Sent Ping Response to " + getClientId()); 201 break; 202 case CONNECT.TYPE: 203 CONNECT connect = new CONNECT().decode(frame); 204 onMQTTConnect(connect); 205 LOG.debug("MQTT Client {} connected. (version: {})", getClientId(), connect.version()); 206 break; 207 case DISCONNECT.TYPE: 208 LOG.debug("MQTT Client {} disconnecting", getClientId()); 209 onMQTTDisconnect(); 210 break; 211 case SUBSCRIBE.TYPE: 212 onSubscribe(new SUBSCRIBE().decode(frame)); 213 break; 214 case UNSUBSCRIBE.TYPE: 215 onUnSubscribe(new UNSUBSCRIBE().decode(frame)); 216 break; 217 case PUBLISH.TYPE: 218 onMQTTPublish(new PUBLISH().decode(frame)); 219 break; 220 case PUBACK.TYPE: 221 onMQTTPubAck(new PUBACK().decode(frame)); 222 break; 223 case PUBREC.TYPE: 224 onMQTTPubRec(new PUBREC().decode(frame)); 225 break; 226 case PUBREL.TYPE: 227 onMQTTPubRel(new PUBREL().decode(frame)); 228 break; 229 case PUBCOMP.TYPE: 230 onMQTTPubComp(new PUBCOMP().decode(frame)); 231 break; 232 default: 233 handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame); 234 } 235 } 236 237 void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException { 238 if (connected.get()) { 239 throw new MQTTProtocolException("Already connected."); 240 } 241 this.connect = connect; 242 243 String clientId = ""; 244 if (connect.clientId() != null) { 245 clientId = connect.clientId().toString(); 246 } 247 248 String userName = null; 249 if (connect.userName() != null) { 250 userName = connect.userName().toString(); 251 } 252 String passswd = null; 253 if (connect.password() != null) { 254 passswd = connect.password().toString(); 255 } 256 257 version = connect.version(); 258 259 configureInactivityMonitor(connect.keepAlive()); 260 261 connectionInfo.setConnectionId(connectionId); 262 if (clientId != null && !clientId.isEmpty()) { 263 connectionInfo.setClientId(clientId); 264 } else { 265 // Clean Session MUST be set for 0 length Client Id 266 if (!connect.cleanSession()) { 267 CONNACK ack = new CONNACK(); 268 ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED); 269 try { 270 getMQTTTransport().sendToMQTT(ack.encode()); 271 getMQTTTransport().onException(IOExceptionSupport.create("Invalid Client ID", null)); 272 } catch (IOException e) { 273 getMQTTTransport().onException(IOExceptionSupport.create(e)); 274 } 275 return; 276 } 277 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); 278 } 279 280 connectionInfo.setResponseRequired(true); 281 connectionInfo.setUserName(userName); 282 connectionInfo.setPassword(passswd); 283 connectionInfo.setTransportContext(mqttTransport.getPeerCertificates()); 284 285 sendToActiveMQ(connectionInfo, new ResponseHandler() { 286 @Override 287 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 288 289 if (response.isException()) { 290 // If the connection attempt fails we close the socket. 291 Throwable exception = ((ExceptionResponse) response).getException(); 292 //let the client know 293 CONNACK ack = new CONNACK(); 294 if (exception instanceof InvalidClientIDException) { 295 ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED); 296 } else if (exception instanceof SecurityException) { 297 ack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED); 298 } else if (exception instanceof CredentialException) { 299 ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); 300 } else { 301 ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE); 302 } 303 getMQTTTransport().sendToMQTT(ack.encode()); 304 getMQTTTransport().onException(IOExceptionSupport.create(exception)); 305 return; 306 } 307 308 final SessionInfo sessionInfo = new SessionInfo(sessionId); 309 sendToActiveMQ(sessionInfo, null); 310 311 final ProducerInfo producerInfo = new ProducerInfo(producerId); 312 sendToActiveMQ(producerInfo, new ResponseHandler() { 313 @Override 314 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 315 316 if (response.isException()) { 317 // If the connection attempt fails we close the socket. 318 Throwable exception = ((ExceptionResponse) response).getException(); 319 CONNACK ack = new CONNACK(); 320 ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); 321 getMQTTTransport().sendToMQTT(ack.encode()); 322 getMQTTTransport().onException(IOExceptionSupport.create(exception)); 323 return; 324 } 325 326 CONNACK ack = new CONNACK(); 327 ack.code(CONNACK.Code.CONNECTION_ACCEPTED); 328 connected.set(true); 329 getMQTTTransport().sendToMQTT(ack.encode()); 330 331 if (connect.cleanSession()) { 332 packetIdGenerator.stopClientSession(getClientId()); 333 } else { 334 packetIdGenerator.startClientSession(getClientId()); 335 } 336 337 findSubscriptionStrategy().onConnect(connect); 338 } 339 }); 340 } 341 }); 342 } 343 344 void onMQTTDisconnect() throws MQTTProtocolException { 345 if (connected.get()) { 346 connected.set(false); 347 sendToActiveMQ(connectionInfo.createRemoveCommand(), null); 348 sendToActiveMQ(new ShutdownInfo(), null); 349 } 350 stopTransport(); 351 } 352 353 void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException { 354 checkConnected(); 355 LOG.trace("MQTT SUBSCRIBE message:{} client:{} connection:{}", 356 command.messageId(), clientId, connectionInfo.getConnectionId()); 357 Topic[] topics = command.topics(); 358 if (topics != null) { 359 byte[] qos = new byte[topics.length]; 360 for (int i = 0; i < topics.length; i++) { 361 try { 362 qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]); 363 } catch (IOException e) { 364 throw new MQTTProtocolException("Failed to process subscription request", true, e); 365 } 366 } 367 SUBACK ack = new SUBACK(); 368 ack.messageId(command.messageId()); 369 ack.grantedQos(qos); 370 try { 371 getMQTTTransport().sendToMQTT(ack.encode()); 372 } catch (IOException e) { 373 LOG.warn("Couldn't send SUBACK for " + command, e); 374 } 375 } else { 376 LOG.warn("No topics defined for Subscription " + command); 377 } 378 } 379 380 public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException { 381 checkConnected(); 382 if (command.qos() != QoS.AT_LEAST_ONCE && (version != V3_1 || publishDollarTopics != true)) { 383 throw new MQTTProtocolException("Failed to process unsubscribe request", true, new Exception("UNSUBSCRIBE frame not properly formatted, QoS")); 384 } 385 UTF8Buffer[] topics = command.topics(); 386 if (topics != null) { 387 for (UTF8Buffer topic : topics) { 388 try { 389 findSubscriptionStrategy().onUnSubscribe(topic.toString()); 390 } catch (IOException e) { 391 throw new MQTTProtocolException("Failed to process unsubscribe request", true, e); 392 } 393 } 394 } 395 UNSUBACK ack = new UNSUBACK(); 396 ack.messageId(command.messageId()); 397 sendToMQTT(ack.encode()); 398 } 399 400 /** 401 * Dispatch an ActiveMQ command 402 */ 403 public void onActiveMQCommand(Command command) throws Exception { 404 if (command.isResponse()) { 405 Response response = (Response) command; 406 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); 407 if (rh != null) { 408 rh.onResponse(this, response); 409 } else { 410 // Pass down any unexpected errors. Should this close the connection? 411 if (response.isException()) { 412 Throwable exception = ((ExceptionResponse) response).getException(); 413 handleException(exception, null); 414 } 415 } 416 } else if (command.isMessageDispatch()) { 417 MessageDispatch md = (MessageDispatch) command; 418 MQTTSubscription sub = findSubscriptionStrategy().getSubscription(md.getConsumerId()); 419 if (sub != null) { 420 MessageAck ack = sub.createMessageAck(md); 421 PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage()); 422 switch (publish.qos()) { 423 case AT_LEAST_ONCE: 424 case EXACTLY_ONCE: 425 publish.dup(publish.dup() ? true : md.getMessage().isRedelivered()); 426 case AT_MOST_ONCE: 427 } 428 if (ack != null && sub.expectAck(publish)) { 429 synchronized (consumerAcks) { 430 consumerAcks.put(publish.messageId(), ack); 431 } 432 } 433 LOG.trace("MQTT Snd PUBLISH message:{} client:{} connection:{}", 434 publish.messageId(), clientId, connectionInfo.getConnectionId()); 435 getMQTTTransport().sendToMQTT(publish.encode()); 436 if (ack != null && !sub.expectAck(publish)) { 437 getMQTTTransport().sendToActiveMQ(ack); 438 } 439 } 440 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { 441 // Pass down any unexpected async errors. Should this close the connection? 442 Throwable exception = ((ConnectionError) command).getException(); 443 handleException(exception, null); 444 } else if (command.isBrokerInfo()) { 445 //ignore 446 } else { 447 LOG.debug("Do not know how to process ActiveMQ Command {}", command); 448 } 449 } 450 451 void onMQTTPublish(PUBLISH command) throws IOException, JMSException { 452 checkConnected(); 453 LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}", 454 command.messageId(), clientId, connectionInfo.getConnectionId()); 455 //Both version 3.1 and 3.1.1 do not allow the topic name to contain a wildcard in the publish packet 456 if (containsMqttWildcard(command.topicName().toString())) { 457 // [MQTT-3.3.2-2]: The Topic Name in the PUBLISH Packet MUST NOT contain wildcard characters 458 getMQTTTransport().onException(IOExceptionSupport.create("The topic name must not contain wildcard characters.", null)); 459 return; 460 } 461 ActiveMQMessage message = convertMessage(command); 462 message.setProducerId(producerId); 463 message.onSend(); 464 sendToActiveMQ(message, createResponseHandler(command)); 465 } 466 467 void onMQTTPubAck(PUBACK command) { 468 short messageId = command.messageId(); 469 LOG.trace("MQTT Rcv PUBACK message:{} client:{} connection:{}", 470 messageId, clientId, connectionInfo.getConnectionId()); 471 packetIdGenerator.ackPacketId(getClientId(), messageId); 472 MessageAck ack; 473 synchronized (consumerAcks) { 474 ack = consumerAcks.remove(messageId); 475 } 476 if (ack != null) { 477 getMQTTTransport().sendToActiveMQ(ack); 478 } 479 } 480 481 void onMQTTPubRec(PUBREC commnand) { 482 //from a subscriber - send a PUBREL in response 483 PUBREL pubrel = new PUBREL(); 484 pubrel.messageId(commnand.messageId()); 485 sendToMQTT(pubrel.encode()); 486 } 487 488 void onMQTTPubRel(PUBREL command) { 489 PUBREC ack; 490 synchronized (publisherRecs) { 491 ack = publisherRecs.remove(command.messageId()); 492 } 493 if (ack == null) { 494 LOG.warn("Unknown PUBREL: {} received", command.messageId()); 495 } 496 PUBCOMP pubcomp = new PUBCOMP(); 497 pubcomp.messageId(command.messageId()); 498 sendToMQTT(pubcomp.encode()); 499 } 500 501 void onMQTTPubComp(PUBCOMP command) { 502 short messageId = command.messageId(); 503 packetIdGenerator.ackPacketId(getClientId(), messageId); 504 MessageAck ack; 505 synchronized (consumerAcks) { 506 ack = consumerAcks.remove(messageId); 507 } 508 if (ack != null) { 509 getMQTTTransport().sendToActiveMQ(ack); 510 } 511 } 512 513 ActiveMQMessage convertMessage(PUBLISH command) throws JMSException { 514 ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); 515 516 msg.setProducerId(producerId); 517 MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId()); 518 msg.setMessageId(id); 519 LOG.trace("MQTT-->ActiveMQ: MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", 520 command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId()); 521 msg.setTimestamp(System.currentTimeMillis()); 522 msg.setPriority((byte) Message.DEFAULT_PRIORITY); 523 msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain()); 524 msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal()); 525 if (command.retain()) { 526 msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true); 527 } 528 529 ActiveMQDestination destination; 530 synchronized (activeMQDestinationMap) { 531 destination = activeMQDestinationMap.get(command.topicName()); 532 if (destination == null) { 533 String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString()); 534 try { 535 destination = findSubscriptionStrategy().onSend(topicName); 536 } catch (IOException e) { 537 throw JMSExceptionSupport.create(e); 538 } 539 540 activeMQDestinationMap.put(command.topicName().toString(), destination); 541 } 542 } 543 544 msg.setJMSDestination(destination); 545 msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length); 546 return msg; 547 } 548 549 public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException { 550 PUBLISH result = new PUBLISH(); 551 // packet id is set in MQTTSubscription 552 QoS qoS; 553 if (message.propertyExists(QOS_PROPERTY_NAME)) { 554 int ordinal = message.getIntProperty(QOS_PROPERTY_NAME); 555 qoS = QoS.values()[ordinal]; 556 557 } else { 558 qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE; 559 } 560 result.qos(qoS); 561 if (message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)) { 562 result.retain(true); 563 } 564 565 String topicName; 566 synchronized (mqttTopicMap) { 567 topicName = mqttTopicMap.get(message.getJMSDestination()); 568 if (topicName == null) { 569 String amqTopicName = findSubscriptionStrategy().onSend(message.getDestination()); 570 topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName); 571 mqttTopicMap.put(message.getJMSDestination(), topicName); 572 } 573 } 574 result.topicName(new UTF8Buffer(topicName)); 575 576 if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { 577 ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); 578 msg.setReadOnlyBody(true); 579 String messageText = msg.getText(); 580 if (messageText != null) { 581 result.payload(new Buffer(messageText.getBytes("UTF-8"))); 582 } 583 } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { 584 ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy(); 585 msg.setReadOnlyBody(true); 586 byte[] data = new byte[(int) msg.getBodyLength()]; 587 msg.readBytes(data); 588 result.payload(new Buffer(data)); 589 } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { 590 ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); 591 msg.setReadOnlyBody(true); 592 Map<String, Object> map = msg.getContentMap(); 593 if (map != null) { 594 result.payload(new Buffer(map.toString().getBytes("UTF-8"))); 595 } 596 } else { 597 ByteSequence byteSequence = message.getContent(); 598 if (byteSequence != null && byteSequence.getLength() > 0) { 599 if (message.isCompressed()) { 600 Inflater inflater = new Inflater(); 601 inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length); 602 byte[] data = new byte[4096]; 603 int read; 604 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 605 while ((read = inflater.inflate(data)) != 0) { 606 bytesOut.write(data, 0, read); 607 } 608 byteSequence = bytesOut.toByteSequence(); 609 bytesOut.close(); 610 } 611 result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length)); 612 } 613 } 614 LOG.trace("ActiveMQ-->MQTT:MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", 615 result.messageId(), clientId, connectionInfo.getConnectionId(), message.getMessageId()); 616 return result; 617 } 618 619 public MQTTTransport getMQTTTransport() { 620 return mqttTransport; 621 } 622 623 boolean willSent = false; 624 public void onTransportError() { 625 if (connect != null) { 626 if (connected.get()) { 627 if (connect.willTopic() != null && connect.willMessage() != null && !willSent) { 628 willSent = true; 629 try { 630 PUBLISH publish = new PUBLISH(); 631 publish.topicName(connect.willTopic()); 632 publish.qos(connect.willQos()); 633 publish.messageId(packetIdGenerator.getNextSequenceId(getClientId())); 634 publish.payload(connect.willMessage()); 635 publish.retain(connect.willRetain()); 636 ActiveMQMessage message = convertMessage(publish); 637 message.setProducerId(producerId); 638 message.onSend(); 639 640 sendToActiveMQ(message, null); 641 } catch (Exception e) { 642 LOG.warn("Failed to publish Will Message " + connect.willMessage()); 643 } 644 } 645 // remove connection info 646 sendToActiveMQ(connectionInfo.createRemoveCommand(), null); 647 } 648 } 649 } 650 651 void configureInactivityMonitor(short keepAliveSeconds) { 652 MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor(); 653 654 // If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false, 655 // then ignore configuring it because it won't exist 656 if (monitor == null) { 657 return; 658 } 659 660 // Client has sent a valid CONNECT frame, we can stop the connect checker. 661 monitor.stopConnectChecker(); 662 663 long keepAliveMS = keepAliveSeconds * 1000; 664 665 LOG.debug("MQTT Client {} requests heart beat of {} ms", getClientId(), keepAliveMS); 666 667 try { 668 // if we have a default keep-alive value, and the client is trying to turn off keep-alive, 669 670 // we'll observe the server-side configured default value (note, no grace period) 671 if (keepAliveMS == 0 && defaultKeepAlive > 0) { 672 keepAliveMS = defaultKeepAlive; 673 } 674 675 long readGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD); 676 677 monitor.setProtocolConverter(this); 678 monitor.setReadKeepAliveTime(keepAliveMS); 679 monitor.setReadGraceTime(readGracePeriod); 680 monitor.startReadChecker(); 681 682 LOG.debug("MQTT Client {} established heart beat of {} ms ({} ms + {} ms grace period)", 683 new Object[] { getClientId(), keepAliveMS, keepAliveMS, readGracePeriod }); 684 } catch (Exception ex) { 685 LOG.warn("Failed to start MQTT InactivityMonitor ", ex); 686 } 687 } 688 689 void handleException(Throwable exception, MQTTFrame command) { 690 LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString()); 691 LOG.debug("Exception detail", exception); 692 693 if (connected.get() && connectionInfo != null) { 694 connected.set(false); 695 sendToActiveMQ(connectionInfo.createRemoveCommand(), null); 696 } 697 stopTransport(); 698 } 699 700 void checkConnected() throws MQTTProtocolException { 701 if (!connected.get()) { 702 throw new MQTTProtocolException("Not connected."); 703 } 704 } 705 706 private void stopTransport() { 707 try { 708 getMQTTTransport().stop(); 709 } catch (Throwable e) { 710 LOG.debug("Failed to stop MQTT transport ", e); 711 } 712 } 713 714 ResponseHandler createResponseHandler(final PUBLISH command) { 715 if (command != null) { 716 return new ResponseHandler() { 717 @Override 718 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 719 if (response.isException()) { 720 Throwable error = ((ExceptionResponse) response).getException(); 721 LOG.warn("Failed to send MQTT Publish: ", command, error.getMessage()); 722 LOG.trace("Error trace: {}", error); 723 } 724 725 switch (command.qos()) { 726 case AT_LEAST_ONCE: 727 PUBACK ack = new PUBACK(); 728 ack.messageId(command.messageId()); 729 LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", 730 command.messageId(), clientId, connectionInfo.getConnectionId()); 731 converter.getMQTTTransport().sendToMQTT(ack.encode()); 732 break; 733 case EXACTLY_ONCE: 734 PUBREC req = new PUBREC(); 735 req.messageId(command.messageId()); 736 synchronized (publisherRecs) { 737 publisherRecs.put(command.messageId(), req); 738 } 739 LOG.trace("MQTT Snd PUBREC message:{} client:{} connection:{}", 740 command.messageId(), clientId, connectionInfo.getConnectionId()); 741 converter.getMQTTTransport().sendToMQTT(req.encode()); 742 break; 743 default: 744 break; 745 } 746 } 747 }; 748 } 749 return null; 750 } 751 752 public long getDefaultKeepAlive() { 753 return defaultKeepAlive; 754 } 755 756 /** 757 * Set the default keep alive time (in milliseconds) that would be used if configured on server side 758 * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame 759 * @param keepAlive the keepAlive in milliseconds 760 */ 761 public void setDefaultKeepAlive(long keepAlive) { 762 this.defaultKeepAlive = keepAlive; 763 } 764 765 public int getActiveMQSubscriptionPrefetch() { 766 return activeMQSubscriptionPrefetch; 767 } 768 769 /** 770 * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one 771 * The default = 1 772 * 773 * @param activeMQSubscriptionPrefetch 774 * set the prefetch for the corresponding ActiveMQ subscription 775 */ 776 public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) { 777 this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch; 778 } 779 780 public MQTTPacketIdGenerator getPacketIdGenerator() { 781 return packetIdGenerator; 782 } 783 784 public void setPublishDollarTopics(boolean publishDollarTopics) { 785 this.publishDollarTopics = publishDollarTopics; 786 } 787 788 public boolean getPublishDollarTopics() { 789 return publishDollarTopics; 790 } 791 792 public ConnectionId getConnectionId() { 793 return connectionId; 794 } 795 796 public SessionId getSessionId() { 797 return sessionId; 798 } 799 800 public boolean isCleanSession() { 801 return this.connect.cleanSession(); 802 } 803 804 public String getSubscriptionStrategy() { 805 return subscriptionStrategyName; 806 } 807 808 public void setSubscriptionStrategy(String name) { 809 this.subscriptionStrategyName = name; 810 } 811 812 public String getClientId() { 813 if (clientId == null) { 814 if (connect != null && connect.clientId() != null) { 815 clientId = connect.clientId().toString(); 816 } else { 817 clientId = ""; 818 } 819 } 820 return clientId; 821 } 822 823 protected boolean containsMqttWildcard(String value) { 824 return value != null && (value.contains(SINGLE_LEVEL_WILDCARD) || 825 value.contains(MULTI_LEVEL_WILDCARD)); 826 } 827 828 protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException { 829 if (subsciptionStrategy == null) { 830 synchronized (STRATAGY_FINDER) { 831 if (subsciptionStrategy != null) { 832 return subsciptionStrategy; 833 } 834 835 MQTTSubscriptionStrategy strategy = null; 836 if (subscriptionStrategyName != null && !subscriptionStrategyName.isEmpty()) { 837 try { 838 strategy = (MQTTSubscriptionStrategy) STRATAGY_FINDER.newInstance(subscriptionStrategyName); 839 LOG.debug("MQTT Using subscription strategy: {}", subscriptionStrategyName); 840 if (strategy instanceof BrokerServiceAware) { 841 ((BrokerServiceAware)strategy).setBrokerService(brokerService); 842 } 843 strategy.initialize(this); 844 } catch (Exception e) { 845 throw IOExceptionSupport.create(e); 846 } 847 } else { 848 throw new IOException("Invalid subscription strategy name given: " + subscriptionStrategyName); 849 } 850 851 this.subsciptionStrategy = strategy; 852 } 853 } 854 return subsciptionStrategy; 855 } 856}