001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport; 018 019import java.io.IOException; 020import java.util.Timer; 021import java.util.concurrent.RejectedExecutionException; 022import java.util.concurrent.SynchronousQueue; 023import java.util.concurrent.ThreadFactory; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicInteger; 028import java.util.concurrent.locks.ReentrantReadWriteLock; 029 030import org.apache.activemq.command.KeepAliveInfo; 031import org.apache.activemq.command.WireFormatInfo; 032import org.apache.activemq.thread.SchedulerTimerTask; 033import org.apache.activemq.wireformat.WireFormat; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Used to make sure that commands are arriving periodically from the peer of 039 * the transport. 040 */ 041public abstract class AbstractInactivityMonitor extends TransportFilter { 042 043 private static final Logger LOG = LoggerFactory.getLogger(AbstractInactivityMonitor.class); 044 045 private static final long DEFAULT_CHECK_TIME_MILLS = 30000; 046 047 private static ThreadPoolExecutor ASYNC_TASKS; 048 private static int CHECKER_COUNTER; 049 private static Timer READ_CHECK_TIMER; 050 private static Timer WRITE_CHECK_TIMER; 051 052 private final AtomicBoolean monitorStarted = new AtomicBoolean(false); 053 054 private final AtomicBoolean commandSent = new AtomicBoolean(false); 055 private final AtomicBoolean inSend = new AtomicBoolean(false); 056 private final AtomicBoolean failed = new AtomicBoolean(false); 057 058 private final AtomicBoolean commandReceived = new AtomicBoolean(true); 059 private final AtomicBoolean inReceive = new AtomicBoolean(false); 060 private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); 061 062 private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock(); 063 064 private SchedulerTimerTask connectCheckerTask; 065 private SchedulerTimerTask writeCheckerTask; 066 private SchedulerTimerTask readCheckerTask; 067 068 private long connectAttemptTimeout = DEFAULT_CHECK_TIME_MILLS; 069 private long readCheckTime = DEFAULT_CHECK_TIME_MILLS; 070 private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS; 071 private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS; 072 private boolean useKeepAlive = true; 073 private boolean keepAliveResponseRequired; 074 075 protected WireFormat wireFormat; 076 077 private final Runnable connectChecker = new Runnable() { 078 079 private final long startTime = System.currentTimeMillis(); 080 081 @Override 082 public void run() { 083 long now = System.currentTimeMillis(); 084 085 if ((now - startTime) >= connectAttemptTimeout && connectCheckerTask != null && !ASYNC_TASKS.isShutdown()) { 086 LOG.debug("No connection attempt made in time for {}! Throwing InactivityIOException.", AbstractInactivityMonitor.this.toString()); 087 try { 088 ASYNC_TASKS.execute(new Runnable() { 089 @Override 090 public void run() { 091 onException(new InactivityIOException( 092 "Channel was inactive (no connection attempt made) for too (>" + (connectAttemptTimeout) + ") long: " + next.getRemoteAddress())); 093 } 094 }); 095 } catch (RejectedExecutionException ex) { 096 if (!ASYNC_TASKS.isShutdown()) { 097 LOG.error("Async connection timeout task was rejected from the executor: ", ex); 098 throw ex; 099 } 100 } 101 } 102 } 103 }; 104 105 private final Runnable readChecker = new Runnable() { 106 long lastRunTime; 107 108 @Override 109 public void run() { 110 long now = System.currentTimeMillis(); 111 long elapsed = (now - lastRunTime); 112 113 if (lastRunTime != 0) { 114 LOG.debug("{}ms elapsed since last read check.", elapsed); 115 } 116 117 // Perhaps the timer executed a read check late.. and then executes 118 // the next read check on time which causes the time elapsed between 119 // read checks to be small.. 120 121 // If less than 90% of the read check Time elapsed then abort this 122 // read check. 123 if (!allowReadCheck(elapsed)) { 124 LOG.debug("Aborting read check...Not enough time elapsed since last read check."); 125 return; 126 } 127 128 lastRunTime = now; 129 readCheck(); 130 } 131 132 @Override 133 public String toString() { 134 return "ReadChecker"; 135 } 136 }; 137 138 private boolean allowReadCheck(long elapsed) { 139 return elapsed > (readCheckTime * 9 / 10); 140 } 141 142 private final Runnable writeChecker = new Runnable() { 143 long lastRunTime; 144 145 @Override 146 public void run() { 147 long now = System.currentTimeMillis(); 148 if (lastRunTime != 0) { 149 LOG.debug("{}: {}ms elapsed since last write check.", this, (now - lastRunTime)); 150 } 151 lastRunTime = now; 152 writeCheck(); 153 } 154 155 @Override 156 public String toString() { 157 return "WriteChecker"; 158 } 159 }; 160 161 public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) { 162 super(next); 163 this.wireFormat = wireFormat; 164 } 165 166 @Override 167 public void start() throws Exception { 168 next.start(); 169 startMonitorThreads(); 170 } 171 172 @Override 173 public void stop() throws Exception { 174 stopMonitorThreads(); 175 next.stop(); 176 } 177 178 final void writeCheck() { 179 if (inSend.get()) { 180 LOG.trace("Send in progress. Skipping write check."); 181 return; 182 } 183 184 if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isShutdown()) { 185 LOG.trace("{} no message sent since last write check, sending a KeepAliveInfo", this); 186 187 try { 188 ASYNC_TASKS.execute(new Runnable() { 189 @Override 190 public void run() { 191 LOG.debug("Running {}", this); 192 if (monitorStarted.get()) { 193 try { 194 // If we can't get the lock it means another 195 // write beat us into the 196 // send and we don't need to heart beat now. 197 if (sendLock.writeLock().tryLock()) { 198 KeepAliveInfo info = new KeepAliveInfo(); 199 info.setResponseRequired(keepAliveResponseRequired); 200 doOnewaySend(info); 201 } 202 } catch (IOException e) { 203 onException(e); 204 } finally { 205 if (sendLock.writeLock().isHeldByCurrentThread()) { 206 sendLock.writeLock().unlock(); 207 } 208 } 209 } 210 } 211 212 @Override 213 public String toString() { 214 return "WriteCheck[" + getRemoteAddress() + "]"; 215 }; 216 }); 217 } catch (RejectedExecutionException ex) { 218 if (!ASYNC_TASKS.isShutdown()) { 219 LOG.error("Async write check was rejected from the executor: ", ex); 220 throw ex; 221 } 222 } 223 } else { 224 LOG.trace("{} message sent since last write check, resetting flag.", this); 225 } 226 227 commandSent.set(false); 228 } 229 230 final void readCheck() { 231 int currentCounter = next.getReceiveCounter(); 232 int previousCounter = lastReceiveCounter.getAndSet(currentCounter); 233 if (inReceive.get() || currentCounter != previousCounter) { 234 LOG.trace("A receive is in progress, skipping read check."); 235 return; 236 } 237 if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isShutdown()) { 238 LOG.debug("No message received since last read check for {}. Throwing InactivityIOException.", this); 239 240 try { 241 ASYNC_TASKS.execute(new Runnable() { 242 @Override 243 public void run() { 244 LOG.debug("Running {}", this); 245 onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress())); 246 } 247 248 @Override 249 public String toString() { 250 return "ReadCheck[" + getRemoteAddress() + "]"; 251 }; 252 }); 253 } catch (RejectedExecutionException ex) { 254 if (!ASYNC_TASKS.isShutdown()) { 255 LOG.error("Async read check was rejected from the executor: ", ex); 256 throw ex; 257 } 258 } 259 } else { 260 if (LOG.isTraceEnabled()) { 261 LOG.trace("Message received since last read check, resetting flag: "); 262 } 263 } 264 commandReceived.set(false); 265 } 266 267 protected abstract void processInboundWireFormatInfo(WireFormatInfo info) throws IOException; 268 269 protected abstract void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException; 270 271 @Override 272 public void onCommand(Object command) { 273 commandReceived.set(true); 274 inReceive.set(true); 275 try { 276 if (command.getClass() == KeepAliveInfo.class) { 277 KeepAliveInfo info = (KeepAliveInfo) command; 278 if (info.isResponseRequired()) { 279 sendLock.readLock().lock(); 280 try { 281 info.setResponseRequired(false); 282 oneway(info); 283 } catch (IOException e) { 284 onException(e); 285 } finally { 286 sendLock.readLock().unlock(); 287 } 288 } 289 } else { 290 if (command.getClass() == WireFormatInfo.class) { 291 synchronized (this) { 292 try { 293 processInboundWireFormatInfo((WireFormatInfo) command); 294 } catch (IOException e) { 295 onException(e); 296 } 297 } 298 } 299 300 transportListener.onCommand(command); 301 } 302 } finally { 303 inReceive.set(false); 304 } 305 } 306 307 @Override 308 public void oneway(Object o) throws IOException { 309 // To prevent the inactivity monitor from sending a message while we 310 // are performing a send we take a read lock. The inactivity monitor 311 // sends its Heart-beat commands under a write lock. This means that 312 // the MutexTransport is still responsible for synchronizing sends 313 sendLock.readLock().lock(); 314 inSend.set(true); 315 try { 316 doOnewaySend(o); 317 } finally { 318 commandSent.set(true); 319 inSend.set(false); 320 sendLock.readLock().unlock(); 321 } 322 } 323 324 // Must be called under lock, either read or write on sendLock. 325 private void doOnewaySend(Object command) throws IOException { 326 if (failed.get()) { 327 throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress()); 328 } 329 if (command.getClass() == WireFormatInfo.class) { 330 synchronized (this) { 331 processOutboundWireFormatInfo((WireFormatInfo) command); 332 } 333 } 334 next.oneway(command); 335 } 336 337 @Override 338 public void onException(IOException error) { 339 if (failed.compareAndSet(false, true)) { 340 stopMonitorThreads(); 341 if (sendLock.writeLock().isHeldByCurrentThread()) { 342 sendLock.writeLock().unlock(); 343 } 344 transportListener.onException(error); 345 } 346 } 347 348 public void setUseKeepAlive(boolean val) { 349 useKeepAlive = val; 350 } 351 352 public long getConnectAttemptTimeout() { 353 return connectAttemptTimeout; 354 } 355 356 public void setConnectAttemptTimeout(long connectionTimeout) { 357 this.connectAttemptTimeout = connectionTimeout; 358 } 359 360 public long getReadCheckTime() { 361 return readCheckTime; 362 } 363 364 public void setReadCheckTime(long readCheckTime) { 365 this.readCheckTime = readCheckTime; 366 } 367 368 public long getWriteCheckTime() { 369 return writeCheckTime; 370 } 371 372 public void setWriteCheckTime(long writeCheckTime) { 373 this.writeCheckTime = writeCheckTime; 374 } 375 376 public long getInitialDelayTime() { 377 return initialDelayTime; 378 } 379 380 public void setInitialDelayTime(long initialDelayTime) { 381 this.initialDelayTime = initialDelayTime; 382 } 383 384 public boolean isKeepAliveResponseRequired() { 385 return this.keepAliveResponseRequired; 386 } 387 388 public void setKeepAliveResponseRequired(boolean value) { 389 this.keepAliveResponseRequired = value; 390 } 391 392 public boolean isMonitorStarted() { 393 return this.monitorStarted.get(); 394 } 395 396 abstract protected boolean configuredOk() throws IOException; 397 398 public synchronized void startConnectCheckTask() { 399 startConnectCheckTask(getConnectAttemptTimeout()); 400 } 401 402 public synchronized void startConnectCheckTask(long connectionTimeout) { 403 if (connectionTimeout <= 0) { 404 return; 405 } 406 407 LOG.trace("Starting connection check task for: {}", this); 408 409 this.connectAttemptTimeout = connectionTimeout; 410 411 if (connectCheckerTask == null) { 412 connectCheckerTask = new SchedulerTimerTask(connectChecker); 413 414 synchronized (AbstractInactivityMonitor.class) { 415 if (CHECKER_COUNTER == 0) { 416 if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) { 417 ASYNC_TASKS = createExecutor(); 418 } 419 if (READ_CHECK_TIMER == null) { 420 READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer", true); 421 } 422 } 423 CHECKER_COUNTER++; 424 READ_CHECK_TIMER.schedule(connectCheckerTask, connectionTimeout); 425 } 426 } 427 } 428 429 public synchronized void stopConnectCheckTask() { 430 if (connectCheckerTask != null) { 431 LOG.trace("Stopping connection check task for: {}", this); 432 connectCheckerTask.cancel(); 433 connectCheckerTask = null; 434 435 synchronized (AbstractInactivityMonitor.class) { 436 READ_CHECK_TIMER.purge(); 437 CHECKER_COUNTER--; 438 } 439 } 440 } 441 442 protected synchronized void startMonitorThreads() throws IOException { 443 if (monitorStarted.get()) { 444 return; 445 } 446 447 if (!configuredOk()) { 448 return; 449 } 450 451 if (readCheckTime > 0) { 452 readCheckerTask = new SchedulerTimerTask(readChecker); 453 } 454 455 if (writeCheckTime > 0) { 456 writeCheckerTask = new SchedulerTimerTask(writeChecker); 457 } 458 459 if (writeCheckTime > 0 || readCheckTime > 0) { 460 monitorStarted.set(true); 461 synchronized (AbstractInactivityMonitor.class) { 462 if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) { 463 ASYNC_TASKS = createExecutor(); 464 } 465 if (READ_CHECK_TIMER == null) { 466 READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer", true); 467 } 468 if (WRITE_CHECK_TIMER == null) { 469 WRITE_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor WriteCheckTimer", true); 470 } 471 472 CHECKER_COUNTER++; 473 if (readCheckTime > 0) { 474 READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime); 475 } 476 if (writeCheckTime > 0) { 477 WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime); 478 } 479 } 480 } 481 } 482 483 protected synchronized void stopMonitorThreads() { 484 stopConnectCheckTask(); 485 if (monitorStarted.compareAndSet(true, false)) { 486 if (readCheckerTask != null) { 487 readCheckerTask.cancel(); 488 } 489 if (writeCheckerTask != null) { 490 writeCheckerTask.cancel(); 491 } 492 493 synchronized (AbstractInactivityMonitor.class) { 494 WRITE_CHECK_TIMER.purge(); 495 READ_CHECK_TIMER.purge(); 496 CHECKER_COUNTER--; 497 if (CHECKER_COUNTER == 0) { 498 WRITE_CHECK_TIMER.cancel(); 499 READ_CHECK_TIMER.cancel(); 500 WRITE_CHECK_TIMER = null; 501 READ_CHECK_TIMER = null; 502 } 503 } 504 } 505 } 506 507 private final ThreadFactory factory = new ThreadFactory() { 508 @Override 509 public Thread newThread(Runnable runnable) { 510 Thread thread = new Thread(runnable, "ActiveMQ InactivityMonitor Worker"); 511 thread.setDaemon(true); 512 return thread; 513 } 514 }; 515 516 private ThreadPoolExecutor createExecutor() { 517 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory); 518 exec.allowCoreThreadTimeOut(true); 519 return exec; 520 } 521 522 private static int getDefaultKeepAliveTime() { 523 return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", 30); 524 } 525}