001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport; 018 019import java.io.IOException; 020import java.util.Timer; 021import java.util.concurrent.RejectedExecutionException; 022import java.util.concurrent.SynchronousQueue; 023import java.util.concurrent.ThreadFactory; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicInteger; 028import java.util.concurrent.locks.ReentrantReadWriteLock; 029 030import org.apache.activemq.command.KeepAliveInfo; 031import org.apache.activemq.command.WireFormatInfo; 032import org.apache.activemq.thread.SchedulerTimerTask; 033import org.apache.activemq.util.ThreadPoolUtils; 034import org.apache.activemq.wireformat.WireFormat; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Used to make sure that commands are arriving periodically from the peer of 040 * the transport. 041 */ 042public abstract class AbstractInactivityMonitor extends TransportFilter { 043 044 private static final Logger LOG = LoggerFactory.getLogger(AbstractInactivityMonitor.class); 045 046 private static final long DEFAULT_CHECK_TIME_MILLS = 30000; 047 048 private static ThreadPoolExecutor ASYNC_TASKS; 049 private static int CHECKER_COUNTER; 050 private static Timer READ_CHECK_TIMER; 051 private static Timer WRITE_CHECK_TIMER; 052 053 private final AtomicBoolean monitorStarted = new AtomicBoolean(false); 054 055 private final AtomicBoolean commandSent = new AtomicBoolean(false); 056 private final AtomicBoolean inSend = new AtomicBoolean(false); 057 private final AtomicBoolean failed = new AtomicBoolean(false); 058 059 private final AtomicBoolean commandReceived = new AtomicBoolean(true); 060 private final AtomicBoolean inReceive = new AtomicBoolean(false); 061 private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); 062 063 private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock(); 064 065 private SchedulerTimerTask connectCheckerTask; 066 private SchedulerTimerTask writeCheckerTask; 067 private SchedulerTimerTask readCheckerTask; 068 069 private long connectAttemptTimeout = DEFAULT_CHECK_TIME_MILLS; 070 private long readCheckTime = DEFAULT_CHECK_TIME_MILLS; 071 private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS; 072 private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS; 073 private boolean useKeepAlive = true; 074 private boolean keepAliveResponseRequired; 075 076 protected WireFormat wireFormat; 077 078 private final Runnable connectChecker = new Runnable() { 079 080 private final long startTime = System.currentTimeMillis(); 081 082 @Override 083 public void run() { 084 long now = System.currentTimeMillis(); 085 086 if ((now - startTime) >= connectAttemptTimeout && connectCheckerTask != null && !ASYNC_TASKS.isShutdown()) { 087 LOG.debug("No connection attempt made in time for {}! Throwing InactivityIOException.", AbstractInactivityMonitor.this.toString()); 088 try { 089 ASYNC_TASKS.execute(new Runnable() { 090 @Override 091 public void run() { 092 onException(new InactivityIOException( 093 "Channel was inactive (no connection attempt made) for too (>" + (connectAttemptTimeout) + ") long: " + next.getRemoteAddress())); 094 } 095 }); 096 } catch (RejectedExecutionException ex) { 097 if (!ASYNC_TASKS.isShutdown()) { 098 LOG.error("Async connection timeout task was rejected from the executor: ", ex); 099 throw ex; 100 } 101 } 102 } 103 } 104 }; 105 106 private final Runnable readChecker = new Runnable() { 107 long lastRunTime; 108 109 @Override 110 public void run() { 111 long now = System.currentTimeMillis(); 112 long elapsed = (now - lastRunTime); 113 114 if (lastRunTime != 0) { 115 LOG.debug("{}ms elapsed since last read check.", elapsed); 116 } 117 118 // Perhaps the timer executed a read check late.. and then executes 119 // the next read check on time which causes the time elapsed between 120 // read checks to be small.. 121 122 // If less than 90% of the read check Time elapsed then abort this 123 // read check. 124 if (!allowReadCheck(elapsed)) { 125 LOG.debug("Aborting read check...Not enough time elapsed since last read check."); 126 return; 127 } 128 129 lastRunTime = now; 130 readCheck(); 131 } 132 133 @Override 134 public String toString() { 135 return "ReadChecker"; 136 } 137 }; 138 139 private boolean allowReadCheck(long elapsed) { 140 return elapsed > (readCheckTime * 9 / 10); 141 } 142 143 private final Runnable writeChecker = new Runnable() { 144 long lastRunTime; 145 146 @Override 147 public void run() { 148 long now = System.currentTimeMillis(); 149 if (lastRunTime != 0) { 150 LOG.debug("{}: {}ms elapsed since last write check.", this, (now - lastRunTime)); 151 } 152 lastRunTime = now; 153 writeCheck(); 154 } 155 156 @Override 157 public String toString() { 158 return "WriteChecker"; 159 } 160 }; 161 162 public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) { 163 super(next); 164 this.wireFormat = wireFormat; 165 } 166 167 @Override 168 public void start() throws Exception { 169 next.start(); 170 startMonitorThreads(); 171 } 172 173 @Override 174 public void stop() throws Exception { 175 stopMonitorThreads(); 176 next.stop(); 177 } 178 179 final void writeCheck() { 180 if (inSend.get()) { 181 LOG.trace("Send in progress. Skipping write check."); 182 return; 183 } 184 185 if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isShutdown()) { 186 LOG.trace("{} no message sent since last write check, sending a KeepAliveInfo", this); 187 188 try { 189 ASYNC_TASKS.execute(new Runnable() { 190 @Override 191 public void run() { 192 LOG.debug("Running {}", this); 193 if (monitorStarted.get()) { 194 try { 195 // If we can't get the lock it means another 196 // write beat us into the 197 // send and we don't need to heart beat now. 198 if (sendLock.writeLock().tryLock()) { 199 KeepAliveInfo info = new KeepAliveInfo(); 200 info.setResponseRequired(keepAliveResponseRequired); 201 doOnewaySend(info); 202 } 203 } catch (IOException e) { 204 onException(e); 205 } finally { 206 if (sendLock.writeLock().isHeldByCurrentThread()) { 207 sendLock.writeLock().unlock(); 208 } 209 } 210 } 211 } 212 213 @Override 214 public String toString() { 215 return "WriteCheck[" + getRemoteAddress() + "]"; 216 }; 217 }); 218 } catch (RejectedExecutionException ex) { 219 if (!ASYNC_TASKS.isShutdown()) { 220 LOG.error("Async write check was rejected from the executor: ", ex); 221 throw ex; 222 } 223 } 224 } else { 225 LOG.trace("{} message sent since last write check, resetting flag.", this); 226 } 227 228 commandSent.set(false); 229 } 230 231 final void readCheck() { 232 int currentCounter = next.getReceiveCounter(); 233 int previousCounter = lastReceiveCounter.getAndSet(currentCounter); 234 if (inReceive.get() || currentCounter != previousCounter) { 235 LOG.trace("A receive is in progress, skipping read check."); 236 return; 237 } 238 if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isShutdown()) { 239 LOG.debug("No message received since last read check for {}. Throwing InactivityIOException.", this); 240 241 try { 242 ASYNC_TASKS.execute(new Runnable() { 243 @Override 244 public void run() { 245 LOG.debug("Running {}", this); 246 onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress())); 247 } 248 249 @Override 250 public String toString() { 251 return "ReadCheck[" + getRemoteAddress() + "]"; 252 }; 253 }); 254 } catch (RejectedExecutionException ex) { 255 if (!ASYNC_TASKS.isShutdown()) { 256 LOG.error("Async read check was rejected from the executor: ", ex); 257 throw ex; 258 } 259 } 260 } else { 261 if (LOG.isTraceEnabled()) { 262 LOG.trace("Message received since last read check, resetting flag: "); 263 } 264 } 265 commandReceived.set(false); 266 } 267 268 protected abstract void processInboundWireFormatInfo(WireFormatInfo info) throws IOException; 269 270 protected abstract void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException; 271 272 @Override 273 public void onCommand(Object command) { 274 commandReceived.set(true); 275 inReceive.set(true); 276 try { 277 if (command.getClass() == KeepAliveInfo.class) { 278 KeepAliveInfo info = (KeepAliveInfo) command; 279 if (info.isResponseRequired()) { 280 sendLock.readLock().lock(); 281 try { 282 info.setResponseRequired(false); 283 oneway(info); 284 } catch (IOException e) { 285 onException(e); 286 } finally { 287 sendLock.readLock().unlock(); 288 } 289 } 290 } else { 291 if (command.getClass() == WireFormatInfo.class) { 292 synchronized (this) { 293 try { 294 processInboundWireFormatInfo((WireFormatInfo) command); 295 } catch (IOException e) { 296 onException(e); 297 } 298 } 299 } 300 301 transportListener.onCommand(command); 302 } 303 } finally { 304 inReceive.set(false); 305 } 306 } 307 308 @Override 309 public void oneway(Object o) throws IOException { 310 // To prevent the inactivity monitor from sending a message while we 311 // are performing a send we take a read lock. The inactivity monitor 312 // sends its Heart-beat commands under a write lock. This means that 313 // the MutexTransport is still responsible for synchronizing sends 314 sendLock.readLock().lock(); 315 inSend.set(true); 316 try { 317 doOnewaySend(o); 318 } finally { 319 commandSent.set(true); 320 inSend.set(false); 321 sendLock.readLock().unlock(); 322 } 323 } 324 325 // Must be called under lock, either read or write on sendLock. 326 private void doOnewaySend(Object command) throws IOException { 327 if (failed.get()) { 328 throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress()); 329 } 330 if (command.getClass() == WireFormatInfo.class) { 331 synchronized (this) { 332 processOutboundWireFormatInfo((WireFormatInfo) command); 333 } 334 } 335 next.oneway(command); 336 } 337 338 @Override 339 public void onException(IOException error) { 340 if (failed.compareAndSet(false, true)) { 341 stopMonitorThreads(); 342 if (sendLock.writeLock().isHeldByCurrentThread()) { 343 sendLock.writeLock().unlock(); 344 } 345 transportListener.onException(error); 346 } 347 } 348 349 public void setUseKeepAlive(boolean val) { 350 useKeepAlive = val; 351 } 352 353 public long getConnectAttemptTimeout() { 354 return connectAttemptTimeout; 355 } 356 357 public void setConnectAttemptTimeout(long connectionTimeout) { 358 this.connectAttemptTimeout = connectionTimeout; 359 } 360 361 public long getReadCheckTime() { 362 return readCheckTime; 363 } 364 365 public void setReadCheckTime(long readCheckTime) { 366 this.readCheckTime = readCheckTime; 367 } 368 369 public long getWriteCheckTime() { 370 return writeCheckTime; 371 } 372 373 public void setWriteCheckTime(long writeCheckTime) { 374 this.writeCheckTime = writeCheckTime; 375 } 376 377 public long getInitialDelayTime() { 378 return initialDelayTime; 379 } 380 381 public void setInitialDelayTime(long initialDelayTime) { 382 this.initialDelayTime = initialDelayTime; 383 } 384 385 public boolean isKeepAliveResponseRequired() { 386 return this.keepAliveResponseRequired; 387 } 388 389 public void setKeepAliveResponseRequired(boolean value) { 390 this.keepAliveResponseRequired = value; 391 } 392 393 public boolean isMonitorStarted() { 394 return this.monitorStarted.get(); 395 } 396 397 abstract protected boolean configuredOk() throws IOException; 398 399 public synchronized void startConnectCheckTask() { 400 startConnectCheckTask(getConnectAttemptTimeout()); 401 } 402 403 public synchronized void startConnectCheckTask(long connectionTimeout) { 404 if (connectionTimeout <= 0) { 405 return; 406 } 407 408 LOG.trace("Starting connection check task for: {}", this); 409 410 this.connectAttemptTimeout = connectionTimeout; 411 412 if (connectCheckerTask == null) { 413 connectCheckerTask = new SchedulerTimerTask(connectChecker); 414 415 synchronized (AbstractInactivityMonitor.class) { 416 if (CHECKER_COUNTER == 0) { 417 if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) { 418 ASYNC_TASKS = createExecutor(); 419 } 420 if (READ_CHECK_TIMER == null) { 421 READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer", true); 422 } 423 } 424 CHECKER_COUNTER++; 425 READ_CHECK_TIMER.schedule(connectCheckerTask, connectionTimeout); 426 } 427 } 428 } 429 430 public synchronized void stopConnectCheckTask() { 431 if (connectCheckerTask != null) { 432 LOG.trace("Stopping connection check task for: {}", this); 433 connectCheckerTask.cancel(); 434 connectCheckerTask = null; 435 436 synchronized (AbstractInactivityMonitor.class) { 437 READ_CHECK_TIMER.purge(); 438 CHECKER_COUNTER--; 439 } 440 } 441 } 442 443 protected synchronized void startMonitorThreads() throws IOException { 444 if (monitorStarted.get()) { 445 return; 446 } 447 448 if (!configuredOk()) { 449 return; 450 } 451 452 if (readCheckTime > 0) { 453 readCheckerTask = new SchedulerTimerTask(readChecker); 454 } 455 456 if (writeCheckTime > 0) { 457 writeCheckerTask = new SchedulerTimerTask(writeChecker); 458 } 459 460 if (writeCheckTime > 0 || readCheckTime > 0) { 461 monitorStarted.set(true); 462 synchronized (AbstractInactivityMonitor.class) { 463 if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) { 464 ASYNC_TASKS = createExecutor(); 465 } 466 if (READ_CHECK_TIMER == null) { 467 READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer", true); 468 } 469 if (WRITE_CHECK_TIMER == null) { 470 WRITE_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor WriteCheckTimer", true); 471 } 472 473 CHECKER_COUNTER++; 474 if (readCheckTime > 0) { 475 READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime); 476 } 477 if (writeCheckTime > 0) { 478 WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime); 479 } 480 } 481 } 482 } 483 484 protected synchronized void stopMonitorThreads() { 485 stopConnectCheckTask(); 486 if (monitorStarted.compareAndSet(true, false)) { 487 if (readCheckerTask != null) { 488 readCheckerTask.cancel(); 489 } 490 if (writeCheckerTask != null) { 491 writeCheckerTask.cancel(); 492 } 493 494 synchronized (AbstractInactivityMonitor.class) { 495 WRITE_CHECK_TIMER.purge(); 496 READ_CHECK_TIMER.purge(); 497 CHECKER_COUNTER--; 498 if (CHECKER_COUNTER == 0) { 499 WRITE_CHECK_TIMER.cancel(); 500 READ_CHECK_TIMER.cancel(); 501 WRITE_CHECK_TIMER = null; 502 READ_CHECK_TIMER = null; 503 try { 504 ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, TimeUnit.SECONDS.toMillis(10)); 505 } finally { 506 ASYNC_TASKS = null; 507 } 508 } 509 } 510 } 511 } 512 513 private final ThreadFactory factory = new ThreadFactory() { 514 @Override 515 public Thread newThread(Runnable runnable) { 516 Thread thread = new Thread(runnable, "ActiveMQ InactivityMonitor Worker"); 517 thread.setDaemon(true); 518 return thread; 519 } 520 }; 521 522 private ThreadPoolExecutor createExecutor() { 523 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory); 524 exec.allowCoreThreadTimeOut(true); 525 return exec; 526 } 527 528 private static int getDefaultKeepAliveTime() { 529 return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", 30); 530 } 531}