001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.transport.amqp;
019
020import java.io.IOException;
021import java.util.Timer;
022import java.util.concurrent.RejectedExecutionException;
023import java.util.concurrent.SynchronousQueue;
024import java.util.concurrent.ThreadFactory;
025import java.util.concurrent.ThreadPoolExecutor;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicBoolean;
028
029import org.apache.activemq.thread.SchedulerTimerTask;
030import org.apache.activemq.transport.AbstractInactivityMonitor;
031import org.apache.activemq.transport.InactivityIOException;
032import org.apache.activemq.transport.Transport;
033import org.apache.activemq.transport.TransportFilter;
034import org.apache.activemq.wireformat.WireFormat;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038public class AmqpInactivityMonitor extends TransportFilter {
039
040    private static final Logger LOG = LoggerFactory.getLogger(AmqpInactivityMonitor.class);
041
042    private static ThreadPoolExecutor ASYNC_TASKS;
043    private static int CONNECTION_CHECK_TASK_COUNTER;
044    private static Timer CONNECTION_CHECK_TASK_TIMER;
045    private static int KEEPALIVE_TASK_COUNTER;
046    private static Timer KEEPALIVE_TASK_TIMER;
047
048    private final AtomicBoolean failed = new AtomicBoolean(false);
049    private AmqpTransport amqpTransport;
050
051    private long connectionTimeout = AmqpWireFormat.DEFAULT_CONNECTION_TIMEOUT;
052
053    private SchedulerTimerTask connectCheckerTask;
054    private final Runnable connectChecker = new Runnable() {
055
056        private final long startTime = System.currentTimeMillis();
057
058        @Override
059        public void run() {
060            long now = System.currentTimeMillis();
061
062            if ((now - startTime) >= connectionTimeout && connectCheckerTask != null && !ASYNC_TASKS.isShutdown()) {
063                LOG.debug("No connection attempt made in time for {}! Throwing InactivityIOException.", AmqpInactivityMonitor.this.toString());
064                try {
065                    ASYNC_TASKS.execute(new Runnable() {
066                        @Override
067                        public void run() {
068                            onException(new InactivityIOException(
069                                "Channel was inactive for too (>" + (connectionTimeout) + ") long: " + next.getRemoteAddress()));
070                        }
071                    });
072                } catch (RejectedExecutionException ex) {
073                    if (!ASYNC_TASKS.isShutdown()) {
074                        LOG.error("Async connection timeout task was rejected from the executor: ", ex);
075                        throw ex;
076                    }
077                }
078            }
079        }
080    };
081
082    private SchedulerTimerTask keepAliveTask;
083    private final Runnable keepAlive = new Runnable() {
084
085        @Override
086        public void run() {
087            if (keepAliveTask != null && !ASYNC_TASKS.isShutdown()) {
088                try {
089                    ASYNC_TASKS.execute(new Runnable() {
090                        @Override
091                        public void run() {
092                            try {
093                                long nextIdleUpdate = amqpTransport.keepAlive();
094                                if (nextIdleUpdate > 0) {
095                                    synchronized (AmqpInactivityMonitor.this) {
096                                        if (keepAliveTask != null) {
097                                            keepAliveTask = new SchedulerTimerTask(keepAlive);
098                                            KEEPALIVE_TASK_TIMER.schedule(keepAliveTask, nextIdleUpdate);
099                                        }
100                                    }
101                                }
102                            } catch (Exception ex) {
103                                onException(new InactivityIOException(
104                                    "Exception while performing idle checks for connection: " + next.getRemoteAddress()));
105                            }
106                        }
107                    });
108                } catch (RejectedExecutionException ex) {
109                    if (!ASYNC_TASKS.isShutdown()) {
110                        LOG.error("Async connection timeout task was rejected from the executor: ", ex);
111                        throw ex;
112                    }
113                }
114            }
115        }
116    };
117
118    public AmqpInactivityMonitor(Transport next, WireFormat wireFormat) {
119        super(next);
120    }
121
122    @Override
123    public void start() throws Exception {
124        next.start();
125    }
126
127    @Override
128    public void stop() throws Exception {
129        stopConnectionTimeoutChecker();
130        stopKeepAliveTask();
131        next.stop();
132    }
133
134    @Override
135    public void onException(IOException error) {
136        if (failed.compareAndSet(false, true)) {
137            stopConnectionTimeoutChecker();
138            if (amqpTransport != null) {
139                amqpTransport.onException(error);
140            }
141            transportListener.onException(error);
142        }
143    }
144
145    public void setAmqpTransport(AmqpTransport amqpTransport) {
146        this.amqpTransport = amqpTransport;
147    }
148
149    public AmqpTransport getAmqpTransport() {
150        return amqpTransport;
151    }
152
153    public synchronized void startConnectionTimeoutChecker(long connectionTimeout) {
154        this.connectionTimeout = connectionTimeout;
155        if (connectionTimeout > 0 && connectCheckerTask == null) {
156            connectCheckerTask = new SchedulerTimerTask(connectChecker);
157
158            long connectionCheckInterval = Math.min(connectionTimeout, 1000);
159
160            synchronized (AbstractInactivityMonitor.class) {
161                if (CONNECTION_CHECK_TASK_COUNTER == 0) {
162                    if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
163                        ASYNC_TASKS = createExecutor();
164                    }
165                    CONNECTION_CHECK_TASK_TIMER = new Timer("AMQP InactivityMonitor State Check", true);
166                }
167                CONNECTION_CHECK_TASK_COUNTER++;
168                CONNECTION_CHECK_TASK_TIMER.schedule(connectCheckerTask, connectionCheckInterval, connectionCheckInterval);
169            }
170        }
171    }
172
173    /**
174     * Starts the keep alive task which will run after the given delay.
175     *
176     * @param nextKeepAliveCheck
177     *        time in milliseconds to wait before performing the next keep-alive check.
178     */
179    public synchronized void startKeepAliveTask(long nextKeepAliveCheck) {
180        if (nextKeepAliveCheck > 0 && keepAliveTask == null) {
181            keepAliveTask = new SchedulerTimerTask(keepAlive);
182
183            synchronized (AbstractInactivityMonitor.class) {
184                if (KEEPALIVE_TASK_COUNTER == 0) {
185                    if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
186                        ASYNC_TASKS = createExecutor();
187                    }
188                    KEEPALIVE_TASK_TIMER = new Timer("AMQP InactivityMonitor Idle Update", true);
189                }
190                KEEPALIVE_TASK_COUNTER++;
191                KEEPALIVE_TASK_TIMER.schedule(keepAliveTask, nextKeepAliveCheck);
192            }
193        }
194    }
195
196    public synchronized void stopConnectionTimeoutChecker() {
197        if (connectCheckerTask != null) {
198            connectCheckerTask.cancel();
199            connectCheckerTask = null;
200
201            synchronized (AbstractInactivityMonitor.class) {
202                CONNECTION_CHECK_TASK_TIMER.purge();
203                CONNECTION_CHECK_TASK_COUNTER--;
204                if (CONNECTION_CHECK_TASK_COUNTER == 0) {
205                    CONNECTION_CHECK_TASK_TIMER.cancel();
206                    CONNECTION_CHECK_TASK_TIMER = null;
207                }
208            }
209        }
210    }
211
212    public synchronized void stopKeepAliveTask() {
213        if (keepAliveTask != null) {
214            keepAliveTask.cancel();
215            keepAliveTask = null;
216
217            synchronized (AbstractInactivityMonitor.class) {
218                KEEPALIVE_TASK_TIMER.purge();
219                KEEPALIVE_TASK_COUNTER--;
220                if (KEEPALIVE_TASK_COUNTER == 0) {
221                    KEEPALIVE_TASK_TIMER.cancel();
222                    KEEPALIVE_TASK_TIMER = null;
223                }
224            }
225        }
226    }
227
228    private final ThreadFactory factory = new ThreadFactory() {
229        @Override
230        public Thread newThread(Runnable runnable) {
231            Thread thread = new Thread(runnable, "AmqpInactivityMonitor Async Task: " + runnable);
232            thread.setDaemon(true);
233            return thread;
234        }
235    };
236
237    private ThreadPoolExecutor createExecutor() {
238        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 90, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
239        exec.allowCoreThreadTimeOut(true);
240        return exec;
241    }
242}