001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt; 018 019import org.fusesource.mqtt.codec.CONNECT; 020import org.fusesource.mqtt.codec.DISCONNECT; 021import org.fusesource.mqtt.codec.PINGREQ; 022import org.fusesource.mqtt.codec.PUBACK; 023import org.fusesource.mqtt.codec.PUBCOMP; 024import org.fusesource.mqtt.codec.PUBLISH; 025import org.fusesource.mqtt.codec.PUBREC; 026import org.fusesource.mqtt.codec.PUBREL; 027import org.fusesource.mqtt.codec.SUBSCRIBE; 028import org.fusesource.mqtt.codec.UNSUBSCRIBE; 029 030/** 031 * A set of static methods useful for handling MQTT based client connections. 032 */ 033public class MQTTProtocolSupport { 034 035 /** 036 * Converts an MQTT formatted Topic name into a suitable ActiveMQ Destination 037 * name string. 038 * 039 * @param name 040 * the MQTT formatted topic name. 041 * 042 * @return an destination name that fits the ActiveMQ conventions. 043 */ 044 public static String convertMQTTToActiveMQ(String name) { 045 char[] chars = name.toCharArray(); 046 for (int i = 0; i < chars.length; i++) { 047 switch(chars[i]) { 048 case '#': 049 chars[i] = '>'; 050 break; 051 case '>': 052 chars[i] = '#'; 053 break; 054 case '+': 055 chars[i] = '*'; 056 break; 057 case '*': 058 chars[i] = '+'; 059 break; 060 case '/': 061 chars[i] = '.'; 062 break; 063 case '.': 064 chars[i] = '/'; 065 break; 066 } 067 } 068 String rc = new String(chars); 069 return rc; 070 } 071 072 /** 073 * Converts an ActiveMQ destination name into a correctly formatted 074 * MQTT destination name. 075 * 076 * @param destinationName 077 * the ActiveMQ destination name to process. 078 * 079 * @return a destination name formatted for MQTT. 080 */ 081 public static String convertActiveMQToMQTT(String destinationName) { 082 char[] chars = destinationName.toCharArray(); 083 for (int i = 0; i < chars.length; i++) { 084 switch(chars[i]) { 085 case '>': 086 chars[i] = '#'; 087 break; 088 case '#': 089 chars[i] = '>'; 090 break; 091 case '*': 092 chars[i] = '+'; 093 break; 094 case '+': 095 chars[i] = '*'; 096 break; 097 case '.': 098 chars[i] = '/'; 099 break; 100 case '/': 101 chars[i] = '.'; 102 break; 103 } 104 } 105 String rc = new String(chars); 106 return rc; 107 } 108 109 /** 110 * Given an MQTT header byte, determine the command type that the header 111 * represents. 112 * 113 * @param header 114 * the byte value for the MQTT frame header. 115 * 116 * @return a string value for the given command type. 117 */ 118 public static String commandType(byte header) { 119 byte messageType = (byte) ((header & 0xF0) >>> 4); 120 switch (messageType) { 121 case PINGREQ.TYPE: 122 return "PINGREQ"; 123 case CONNECT.TYPE: 124 return "CONNECT"; 125 case DISCONNECT.TYPE: 126 return "DISCONNECT"; 127 case SUBSCRIBE.TYPE: 128 return "SUBSCRIBE"; 129 case UNSUBSCRIBE.TYPE: 130 return "UNSUBSCRIBE"; 131 case PUBLISH.TYPE: 132 return "PUBLISH"; 133 case PUBACK.TYPE: 134 return "PUBACK"; 135 case PUBREC.TYPE: 136 return "PUBREC"; 137 case PUBREL.TYPE: 138 return "PUBREL"; 139 case PUBCOMP.TYPE: 140 return "PUBCOMP"; 141 default: 142 return "UNKNOWN"; 143 } 144 } 145}