001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.transport.amqp; 019 020import java.io.IOException; 021import java.util.Timer; 022import java.util.concurrent.RejectedExecutionException; 023import java.util.concurrent.SynchronousQueue; 024import java.util.concurrent.ThreadFactory; 025import java.util.concurrent.ThreadPoolExecutor; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicBoolean; 028 029import org.apache.activemq.thread.SchedulerTimerTask; 030import org.apache.activemq.transport.AbstractInactivityMonitor; 031import org.apache.activemq.transport.InactivityIOException; 032import org.apache.activemq.transport.Transport; 033import org.apache.activemq.transport.TransportFilter; 034import org.apache.activemq.wireformat.WireFormat; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038public class AmqpInactivityMonitor extends TransportFilter { 039 040 private static final Logger LOG = LoggerFactory.getLogger(AmqpInactivityMonitor.class); 041 042 private static ThreadPoolExecutor ASYNC_TASKS; 043 private static int CONNECTION_CHECK_TASK_COUNTER; 044 private static Timer CONNECTION_CHECK_TASK_TIMER; 045 private static int KEEPALIVE_TASK_COUNTER; 046 private static Timer KEEPALIVE_TASK_TIMER; 047 048 private final AtomicBoolean failed = new AtomicBoolean(false); 049 private AmqpTransport amqpTransport; 050 051 private long connectionTimeout = AmqpWireFormat.DEFAULT_CONNECTION_TIMEOUT; 052 053 private SchedulerTimerTask connectCheckerTask; 054 private final Runnable connectChecker = new Runnable() { 055 056 private final long startTime = System.currentTimeMillis(); 057 058 @Override 059 public void run() { 060 long now = System.currentTimeMillis(); 061 062 if ((now - startTime) >= connectionTimeout && connectCheckerTask != null && !ASYNC_TASKS.isShutdown()) { 063 LOG.debug("No connection attempt made in time for {}! Throwing InactivityIOException.", AmqpInactivityMonitor.this.toString()); 064 try { 065 ASYNC_TASKS.execute(new Runnable() { 066 @Override 067 public void run() { 068 onException(new InactivityIOException( 069 "Channel was inactive for too (>" + (connectionTimeout) + ") long: " + next.getRemoteAddress())); 070 } 071 }); 072 } catch (RejectedExecutionException ex) { 073 if (!ASYNC_TASKS.isShutdown()) { 074 LOG.error("Async connection timeout task was rejected from the executor: ", ex); 075 throw ex; 076 } 077 } 078 } 079 } 080 }; 081 082 private SchedulerTimerTask keepAliveTask; 083 private final Runnable keepAlive = new Runnable() { 084 085 @Override 086 public void run() { 087 if (keepAliveTask != null && !ASYNC_TASKS.isShutdown()) { 088 try { 089 ASYNC_TASKS.execute(new Runnable() { 090 @Override 091 public void run() { 092 try { 093 long nextIdleUpdate = amqpTransport.keepAlive(); 094 if (nextIdleUpdate > 0) { 095 synchronized (AmqpInactivityMonitor.this) { 096 if (keepAliveTask != null) { 097 keepAliveTask = new SchedulerTimerTask(keepAlive); 098 KEEPALIVE_TASK_TIMER.schedule(keepAliveTask, nextIdleUpdate); 099 } 100 } 101 } 102 } catch (Exception ex) { 103 onException(new InactivityIOException( 104 "Exception while performing idle checks for connection: " + next.getRemoteAddress())); 105 } 106 } 107 }); 108 } catch (RejectedExecutionException ex) { 109 if (!ASYNC_TASKS.isShutdown()) { 110 LOG.error("Async connection timeout task was rejected from the executor: ", ex); 111 throw ex; 112 } 113 } 114 } 115 } 116 }; 117 118 public AmqpInactivityMonitor(Transport next, WireFormat wireFormat) { 119 super(next); 120 } 121 122 @Override 123 public void start() throws Exception { 124 next.start(); 125 } 126 127 @Override 128 public void stop() throws Exception { 129 stopConnectionTimeoutChecker(); 130 stopKeepAliveTask(); 131 next.stop(); 132 } 133 134 @Override 135 public void onException(IOException error) { 136 if (failed.compareAndSet(false, true)) { 137 stopConnectionTimeoutChecker(); 138 if (amqpTransport != null) { 139 amqpTransport.onException(error); 140 } 141 transportListener.onException(error); 142 } 143 } 144 145 public void setAmqpTransport(AmqpTransport amqpTransport) { 146 this.amqpTransport = amqpTransport; 147 } 148 149 public AmqpTransport getAmqpTransport() { 150 return amqpTransport; 151 } 152 153 public synchronized void startConnectionTimeoutChecker(long connectionTimeout) { 154 this.connectionTimeout = connectionTimeout; 155 if (connectionTimeout > 0 && connectCheckerTask == null) { 156 connectCheckerTask = new SchedulerTimerTask(connectChecker); 157 158 long connectionCheckInterval = Math.min(connectionTimeout, 1000); 159 160 synchronized (AbstractInactivityMonitor.class) { 161 if (CONNECTION_CHECK_TASK_COUNTER == 0) { 162 if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) { 163 ASYNC_TASKS = createExecutor(); 164 } 165 CONNECTION_CHECK_TASK_TIMER = new Timer("AMQP InactivityMonitor State Check", true); 166 } 167 CONNECTION_CHECK_TASK_COUNTER++; 168 CONNECTION_CHECK_TASK_TIMER.schedule(connectCheckerTask, connectionCheckInterval, connectionCheckInterval); 169 } 170 } 171 } 172 173 /** 174 * Starts the keep alive task which will run after the given delay. 175 * 176 * @param nextKeepAliveCheck 177 * time in milliseconds to wait before performing the next keep-alive check. 178 */ 179 public synchronized void startKeepAliveTask(long nextKeepAliveCheck) { 180 if (nextKeepAliveCheck > 0 && keepAliveTask == null) { 181 keepAliveTask = new SchedulerTimerTask(keepAlive); 182 183 synchronized (AbstractInactivityMonitor.class) { 184 if (KEEPALIVE_TASK_COUNTER == 0) { 185 if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) { 186 ASYNC_TASKS = createExecutor(); 187 } 188 KEEPALIVE_TASK_TIMER = new Timer("AMQP InactivityMonitor Idle Update", true); 189 } 190 KEEPALIVE_TASK_COUNTER++; 191 KEEPALIVE_TASK_TIMER.schedule(keepAliveTask, nextKeepAliveCheck); 192 } 193 } 194 } 195 196 public synchronized void stopConnectionTimeoutChecker() { 197 if (connectCheckerTask != null) { 198 connectCheckerTask.cancel(); 199 connectCheckerTask = null; 200 201 synchronized (AbstractInactivityMonitor.class) { 202 CONNECTION_CHECK_TASK_TIMER.purge(); 203 CONNECTION_CHECK_TASK_COUNTER--; 204 if (CONNECTION_CHECK_TASK_COUNTER == 0) { 205 CONNECTION_CHECK_TASK_TIMER.cancel(); 206 CONNECTION_CHECK_TASK_TIMER = null; 207 } 208 } 209 } 210 } 211 212 public synchronized void stopKeepAliveTask() { 213 if (keepAliveTask != null) { 214 keepAliveTask.cancel(); 215 keepAliveTask = null; 216 217 synchronized (AbstractInactivityMonitor.class) { 218 KEEPALIVE_TASK_TIMER.purge(); 219 KEEPALIVE_TASK_COUNTER--; 220 if (KEEPALIVE_TASK_COUNTER == 0) { 221 KEEPALIVE_TASK_TIMER.cancel(); 222 KEEPALIVE_TASK_TIMER = null; 223 } 224 } 225 } 226 } 227 228 private final ThreadFactory factory = new ThreadFactory() { 229 @Override 230 public Thread newThread(Runnable runnable) { 231 Thread thread = new Thread(runnable, "AmqpInactivityMonitor Async Task: " + runnable); 232 thread.setDaemon(true); 233 return thread; 234 } 235 }; 236 237 private ThreadPoolExecutor createExecutor() { 238 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 90, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory); 239 exec.allowCoreThreadTimeOut(true); 240 return exec; 241 } 242}