001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt; 018 019import java.io.IOException; 020import java.security.cert.X509Certificate; 021import java.util.concurrent.atomic.AtomicBoolean; 022 023import javax.jms.JMSException; 024 025import org.apache.activemq.broker.BrokerService; 026import org.apache.activemq.command.Command; 027import org.apache.activemq.transport.Transport; 028import org.apache.activemq.transport.TransportFilter; 029import org.apache.activemq.transport.TransportListener; 030import org.apache.activemq.transport.nio.NIOSSLTransport; 031import org.apache.activemq.transport.tcp.SslTransport; 032import org.apache.activemq.util.IOExceptionSupport; 033import org.apache.activemq.wireformat.WireFormat; 034import org.fusesource.mqtt.codec.CONNACK; 035import org.fusesource.mqtt.codec.CONNECT; 036import org.fusesource.mqtt.codec.DISCONNECT; 037import org.fusesource.mqtt.codec.MQTTFrame; 038import org.fusesource.mqtt.codec.PINGREQ; 039import org.fusesource.mqtt.codec.PINGRESP; 040import org.fusesource.mqtt.codec.PUBACK; 041import org.fusesource.mqtt.codec.PUBCOMP; 042import org.fusesource.mqtt.codec.PUBLISH; 043import org.fusesource.mqtt.codec.PUBREC; 044import org.fusesource.mqtt.codec.PUBREL; 045import org.fusesource.mqtt.codec.SUBACK; 046import org.fusesource.mqtt.codec.SUBSCRIBE; 047import org.fusesource.mqtt.codec.UNSUBSCRIBE; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * The MQTTTransportFilter normally sits on top of a TcpTransport that has been 053 * configured with the StompWireFormat and is used to convert MQTT commands to 054 * ActiveMQ commands. All of the conversion work is done by delegating to the 055 * MQTTProtocolConverter 056 */ 057public class MQTTTransportFilter extends TransportFilter implements MQTTTransport { 058 private static final Logger LOG = LoggerFactory.getLogger(MQTTTransportFilter.class); 059 private static final Logger TRACE = LoggerFactory.getLogger(MQTTTransportFilter.class.getPackage().getName() + ".MQTTIO"); 060 private final MQTTProtocolConverter protocolConverter; 061 private MQTTInactivityMonitor monitor; 062 private MQTTWireFormat wireFormat; 063 private final AtomicBoolean stopped = new AtomicBoolean(); 064 065 private boolean trace; 066 private final Object sendLock = new Object(); 067 068 public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) { 069 super(next); 070 this.protocolConverter = new MQTTProtocolConverter(this, brokerService); 071 072 if (wireFormat instanceof MQTTWireFormat) { 073 this.wireFormat = (MQTTWireFormat) wireFormat; 074 } 075 } 076 077 @Override 078 public void oneway(Object o) throws IOException { 079 try { 080 final Command command = (Command) o; 081 protocolConverter.onActiveMQCommand(command); 082 } catch (Exception e) { 083 throw IOExceptionSupport.create(e); 084 } 085 } 086 087 @Override 088 public void onCommand(Object command) { 089 try { 090 MQTTFrame frame = (MQTTFrame) command; 091 if (trace) { 092 TRACE.trace("Received: " + toString(frame)); 093 } 094 protocolConverter.onMQTTCommand(frame); 095 } catch (IOException e) { 096 onException(e); 097 } catch (JMSException e) { 098 onException(IOExceptionSupport.create(e)); 099 } 100 } 101 102 @Override 103 public void sendToActiveMQ(Command command) { 104 TransportListener l = transportListener; 105 if (l != null) { 106 l.onCommand(command); 107 } 108 } 109 110 @Override 111 public void sendToMQTT(MQTTFrame command) throws IOException { 112 if( !stopped.get() ) { 113 if (trace) { 114 TRACE.trace("Sending : " + toString(command)); 115 } 116 Transport n = next; 117 if (n != null) { 118 // sync access to underlying transport buffer 119 synchronized (sendLock) { 120 n.oneway(command); 121 } 122 } 123 } 124 } 125 126 static private String toString(MQTTFrame frame) { 127 if( frame == null ) 128 return null; 129 try { 130 switch (frame.messageType()) { 131 case PINGREQ.TYPE: return new PINGREQ().decode(frame).toString(); 132 case PINGRESP.TYPE: return new PINGRESP().decode(frame).toString(); 133 case CONNECT.TYPE: return new CONNECT().decode(frame).toString(); 134 case DISCONNECT.TYPE: return new DISCONNECT().decode(frame).toString(); 135 case SUBSCRIBE.TYPE: return new SUBSCRIBE().decode(frame).toString(); 136 case UNSUBSCRIBE.TYPE: return new UNSUBSCRIBE().decode(frame).toString(); 137 case PUBLISH.TYPE: return new PUBLISH().decode(frame).toString(); 138 case PUBACK.TYPE: return new PUBACK().decode(frame).toString(); 139 case PUBREC.TYPE: return new PUBREC().decode(frame).toString(); 140 case PUBREL.TYPE: return new PUBREL().decode(frame).toString(); 141 case PUBCOMP.TYPE: return new PUBCOMP().decode(frame).toString(); 142 case CONNACK.TYPE: return new CONNACK().decode(frame).toString(); 143 case SUBACK.TYPE: return new SUBACK().decode(frame).toString(); 144 default: return frame.toString(); 145 } 146 } catch (Throwable e) { 147 e.printStackTrace(); 148 return frame.toString(); 149 } 150 } 151 152 @Override 153 public void start() throws Exception { 154 if (monitor != null) { 155 monitor.startConnectChecker(getConnectAttemptTimeout()); 156 } 157 super.start(); 158 } 159 160 @Override 161 public void stop() throws Exception { 162 if (stopped.compareAndSet(false, true)) { 163 super.stop(); 164 } 165 } 166 167 @Override 168 public X509Certificate[] getPeerCertificates() { 169 X509Certificate[] peerCerts = null; 170 if (next instanceof SslTransport) { 171 peerCerts = ((SslTransport) next).getPeerCertificates(); 172 } 173 if (next instanceof NIOSSLTransport) { 174 peerCerts = ((NIOSSLTransport)next).getPeerCertificates(); 175 } 176 if (trace && peerCerts != null) { 177 LOG.debug("Peer Identity has been verified\n"); 178 } 179 return peerCerts; 180 } 181 182 public boolean isTrace() { 183 return trace; 184 } 185 186 public void setTrace(boolean trace) { 187 this.trace = trace; 188 } 189 190 @Override 191 public MQTTInactivityMonitor getInactivityMonitor() { 192 return monitor; 193 } 194 195 public void setInactivityMonitor(MQTTInactivityMonitor monitor) { 196 this.monitor = monitor; 197 } 198 199 @Override 200 public MQTTWireFormat getWireFormat() { 201 return this.wireFormat; 202 } 203 204 @Override 205 public void onException(IOException error) { 206 protocolConverter.onTransportError(); 207 super.onException(error); 208 } 209 210 public long getDefaultKeepAlive() { 211 return protocolConverter != null ? protocolConverter.getDefaultKeepAlive() : -1; 212 } 213 214 public void setDefaultKeepAlive(long defaultHeartBeat) { 215 protocolConverter.setDefaultKeepAlive(defaultHeartBeat); 216 } 217 218 /** 219 * @return the timeout value used to fail a connection if no CONNECT frame read. 220 */ 221 public long getConnectAttemptTimeout() { 222 return wireFormat.getConnectAttemptTimeout(); 223 } 224 225 /** 226 * Sets the timeout value used to fail a connection if no CONNECT frame is read 227 * in the given interval. 228 * 229 * @param connectTimeout 230 * the connection frame received timeout value. 231 */ 232 public void setConnectAttemptTimeout(long connectTimeout) { 233 wireFormat.setConnectAttemptTimeout(connectTimeout); 234 } 235 236 public boolean getPublishDollarTopics() { 237 return protocolConverter != null && protocolConverter.getPublishDollarTopics(); 238 } 239 240 public void setPublishDollarTopics(boolean publishDollarTopics) { 241 protocolConverter.setPublishDollarTopics(publishDollarTopics); 242 } 243 244 public String getSubscriptionStrategy() { 245 return protocolConverter != null ? protocolConverter.getSubscriptionStrategy() : "default"; 246 } 247 248 public void setSubscriptionStrategy(String name) { 249 protocolConverter.setSubscriptionStrategy(name); 250 } 251 252 public int getActiveMQSubscriptionPrefetch() { 253 return protocolConverter.getActiveMQSubscriptionPrefetch(); 254 } 255 256 /** 257 * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one 258 * The default = 1 259 * @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ subscription 260 */ 261 public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) { 262 protocolConverter.setActiveMQSubscriptionPrefetch(activeMQSubscriptionPrefetch); 263 } 264 265 /** 266 * @return the maximum number of bytes a single MQTT message frame is allowed to be. 267 */ 268 public int getMaxFrameSize() { 269 return wireFormat.getMaxFrameSize(); 270 } 271 272 /** 273 * Sets the maximum frame size for an incoming MQTT frame. The protocl limit is 274 * 256 megabytes and this value cannot be set higher. 275 * 276 * @param maxFrameSize 277 * the maximum allowed frame size for a single MQTT frame. 278 */ 279 public void setMaxFrameSize(int maxFrameSize) { 280 wireFormat.setMaxFrameSize(maxFrameSize); 281 } 282 283 @Override 284 public void setPeerCertificates(X509Certificate[] certificates) {} 285}