001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.transport.mqtt; 019 020import java.io.IOException; 021import java.util.Timer; 022import java.util.concurrent.RejectedExecutionException; 023import java.util.concurrent.SynchronousQueue; 024import java.util.concurrent.ThreadFactory; 025import java.util.concurrent.ThreadPoolExecutor; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.concurrent.atomic.AtomicInteger; 029import java.util.concurrent.locks.ReentrantLock; 030 031import org.apache.activemq.thread.SchedulerTimerTask; 032import org.apache.activemq.transport.AbstractInactivityMonitor; 033import org.apache.activemq.transport.InactivityIOException; 034import org.apache.activemq.transport.Transport; 035import org.apache.activemq.transport.TransportFilter; 036import org.apache.activemq.wireformat.WireFormat; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040public class MQTTInactivityMonitor extends TransportFilter { 041 042 private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class); 043 044 private static final long DEFAULT_CHECK_TIME_MILLS = 30000; 045 046 private static ThreadPoolExecutor ASYNC_TASKS; 047 private static int CHECKER_COUNTER; 048 private static Timer READ_CHECK_TIMER; 049 050 private final AtomicBoolean failed = new AtomicBoolean(false); 051 private final AtomicBoolean inReceive = new AtomicBoolean(false); 052 private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); 053 054 private final ReentrantLock sendLock = new ReentrantLock(); 055 private SchedulerTimerTask readCheckerTask; 056 057 private long readGraceTime = DEFAULT_CHECK_TIME_MILLS; 058 private long readKeepAliveTime = DEFAULT_CHECK_TIME_MILLS; 059 private MQTTProtocolConverter protocolConverter; 060 061 private long connectionTimeout = MQTTWireFormat.DEFAULT_CONNECTION_TIMEOUT; 062 private SchedulerTimerTask connectCheckerTask; 063 private final Runnable connectChecker = new Runnable() { 064 065 private final long startTime = System.currentTimeMillis(); 066 067 @Override 068 public void run() { 069 070 long now = System.currentTimeMillis(); 071 072 if ((now - startTime) >= connectionTimeout && connectCheckerTask != null && !ASYNC_TASKS.isShutdown()) { 073 if (LOG.isDebugEnabled()) { 074 LOG.debug("No CONNECT frame received in time for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException."); 075 } 076 077 try { 078 ASYNC_TASKS.execute(new Runnable() { 079 @Override 080 public void run() { 081 onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime + readGraceTime) + ") long: " 082 + next.getRemoteAddress())); 083 } 084 }); 085 } catch (RejectedExecutionException ex) { 086 if (!ASYNC_TASKS.isShutdown()) { 087 LOG.error("Async connection timeout task was rejected from the executor: ", ex); 088 throw ex; 089 } 090 } 091 } 092 } 093 }; 094 095 private final Runnable readChecker = new Runnable() { 096 long lastReceiveTime = System.currentTimeMillis(); 097 098 @Override 099 public void run() { 100 101 long now = System.currentTimeMillis(); 102 int currentCounter = next.getReceiveCounter(); 103 int previousCounter = lastReceiveCounter.getAndSet(currentCounter); 104 105 // for the PINGREQ/RESP frames, the currentCounter will be different 106 // from previousCounter, and that 107 // should be sufficient to indicate the connection is still alive. 108 // If there were random data, or something 109 // outside the scope of the spec, the wire format unrmarshalling 110 // would fail, so we don't need to handle 111 // PINGREQ/RESP explicitly here 112 if (inReceive.get() || currentCounter != previousCounter) { 113 if (LOG.isTraceEnabled()) { 114 LOG.trace("Command received since last read check."); 115 } 116 lastReceiveTime = now; 117 return; 118 } 119 120 if ((now - lastReceiveTime) >= readKeepAliveTime + readGraceTime && readCheckerTask != null && !ASYNC_TASKS.isShutdown()) { 121 if (LOG.isDebugEnabled()) { 122 LOG.debug("No message received since last read check for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException."); 123 } 124 try { 125 ASYNC_TASKS.execute(new Runnable() { 126 @Override 127 public void run() { 128 onException(new InactivityIOException("Channel was inactive for too (>" + 129 (connectionTimeout) + ") long: " + next.getRemoteAddress())); 130 } 131 }); 132 } catch (RejectedExecutionException ex) { 133 if (!ASYNC_TASKS.isShutdown()) { 134 LOG.error("Async connection timeout task was rejected from the executor: ", ex); 135 throw ex; 136 } 137 } 138 } 139 } 140 }; 141 142 public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) { 143 super(next); 144 } 145 146 @Override 147 public void start() throws Exception { 148 next.start(); 149 } 150 151 @Override 152 public void stop() throws Exception { 153 stopReadChecker(); 154 stopConnectChecker(); 155 next.stop(); 156 } 157 158 @Override 159 public void onCommand(Object command) { 160 inReceive.set(true); 161 try { 162 transportListener.onCommand(command); 163 } finally { 164 inReceive.set(false); 165 } 166 } 167 168 @Override 169 public void oneway(Object o) throws IOException { 170 // To prevent the inactivity monitor from sending a message while we 171 // are performing a send we take the lock. 172 this.sendLock.lock(); 173 try { 174 doOnewaySend(o); 175 } finally { 176 this.sendLock.unlock(); 177 } 178 } 179 180 // Must be called under lock, either read or write on sendLock. 181 private void doOnewaySend(Object command) throws IOException { 182 if (failed.get()) { 183 throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress()); 184 } 185 next.oneway(command); 186 } 187 188 @Override 189 public void onException(IOException error) { 190 if (failed.compareAndSet(false, true)) { 191 stopConnectChecker(); 192 stopReadChecker(); 193 if (protocolConverter != null) { 194 protocolConverter.onTransportError(); 195 } 196 transportListener.onException(error); 197 } 198 } 199 200 public long getReadGraceTime() { 201 return readGraceTime; 202 } 203 204 public void setReadGraceTime(long readGraceTime) { 205 this.readGraceTime = readGraceTime; 206 } 207 208 public long getReadKeepAliveTime() { 209 return readKeepAliveTime; 210 } 211 212 public void setReadKeepAliveTime(long readKeepAliveTime) { 213 this.readKeepAliveTime = readKeepAliveTime; 214 } 215 216 public void setProtocolConverter(MQTTProtocolConverter protocolConverter) { 217 this.protocolConverter = protocolConverter; 218 } 219 220 public MQTTProtocolConverter getProtocolConverter() { 221 return protocolConverter; 222 } 223 224 public synchronized void startConnectChecker(long connectionTimeout) { 225 this.connectionTimeout = connectionTimeout; 226 if (connectionTimeout > 0 && connectCheckerTask == null) { 227 connectCheckerTask = new SchedulerTimerTask(connectChecker); 228 229 long connectionCheckInterval = Math.min(connectionTimeout, 1000); 230 231 synchronized (AbstractInactivityMonitor.class) { 232 if (CHECKER_COUNTER == 0) { 233 if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) { 234 ASYNC_TASKS = createExecutor(); 235 } 236 READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true); 237 } 238 CHECKER_COUNTER++; 239 READ_CHECK_TIMER.schedule(connectCheckerTask, connectionCheckInterval, connectionCheckInterval); 240 } 241 } 242 } 243 244 synchronized void startReadChecker() { 245 if (readKeepAliveTime > 0 && readCheckerTask == null) { 246 readCheckerTask = new SchedulerTimerTask(readChecker); 247 248 synchronized (AbstractInactivityMonitor.class) { 249 if (CHECKER_COUNTER == 0) { 250 if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) { 251 ASYNC_TASKS = createExecutor(); 252 } 253 READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true); 254 } 255 CHECKER_COUNTER++; 256 READ_CHECK_TIMER.schedule(readCheckerTask, readKeepAliveTime, readGraceTime); 257 } 258 } 259 } 260 261 synchronized void stopConnectChecker() { 262 if (connectCheckerTask != null) { 263 connectCheckerTask.cancel(); 264 connectCheckerTask = null; 265 266 synchronized (AbstractInactivityMonitor.class) { 267 READ_CHECK_TIMER.purge(); 268 CHECKER_COUNTER--; 269 if (CHECKER_COUNTER == 0) { 270 READ_CHECK_TIMER.cancel(); 271 READ_CHECK_TIMER = null; 272 } 273 } 274 } 275 } 276 277 synchronized void stopReadChecker() { 278 if (readCheckerTask != null) { 279 readCheckerTask.cancel(); 280 readCheckerTask = null; 281 282 synchronized (AbstractInactivityMonitor.class) { 283 READ_CHECK_TIMER.purge(); 284 CHECKER_COUNTER--; 285 if (CHECKER_COUNTER == 0) { 286 READ_CHECK_TIMER.cancel(); 287 READ_CHECK_TIMER = null; 288 } 289 } 290 } 291 } 292 293 private final ThreadFactory factory = new ThreadFactory() { 294 @Override 295 public Thread newThread(Runnable runnable) { 296 Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable); 297 thread.setDaemon(true); 298 return thread; 299 } 300 }; 301 302 private ThreadPoolExecutor createExecutor() { 303 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory); 304 exec.allowCoreThreadTimeOut(true); 305 return exec; 306 } 307}