001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.transport; 019 020import java.io.IOException; 021import java.net.Socket; 022import java.util.Iterator; 023import java.util.concurrent.ConcurrentLinkedQueue; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.apache.activemq.transport.tcp.TimeStampStream; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * This filter implements write timeouts for socket write operations. 032 * When using blocking IO, the Java implementation doesn't have an explicit flag 033 * to set a timeout, and can cause operations to block forever (or until the TCP stack implementation times out the retransmissions, 034 * which is usually around 13-30 minutes).<br/> 035 * To enable this transport, in the transport URI, simpley add<br/> 036 * <code>transport.soWriteTimeout=<value in millis></code>.<br/> 037 * For example (15 second timeout on write operations to the socket):</br> 038 * <pre><code> 039 * <transportConnector 040 * name="tcp1" 041 * uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000" 042 * /> 043 * </code></pre><br/> 044 * For example (enable default timeout on the socket):</br> 045 * <pre><code> 046 * <transportConnector 047 * name="tcp1" 048 * uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000" 049 * /> 050 * </code></pre> 051 * @author Filip Hanik 052 * 053 */ 054public class WriteTimeoutFilter extends TransportFilter { 055 056 private static final Logger LOG = LoggerFactory.getLogger(WriteTimeoutFilter.class); 057 protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>(); 058 protected static AtomicInteger messageCounter = new AtomicInteger(0); 059 protected static TimeoutThread timeoutThread = new TimeoutThread(); 060 061 protected static long sleep = 5000l; 062 063 protected long writeTimeout = -1; 064 065 public WriteTimeoutFilter(Transport next) { 066 super(next); 067 } 068 069 @Override 070 public void oneway(Object command) throws IOException { 071 try { 072 registerWrite(this); 073 super.oneway(command); 074 } catch (IOException x) { 075 throw x; 076 } finally { 077 deRegisterWrite(this,false,null); 078 } 079 } 080 081 public long getWriteTimeout() { 082 return writeTimeout; 083 } 084 085 public void setWriteTimeout(long writeTimeout) { 086 this.writeTimeout = writeTimeout; 087 } 088 089 public static long getSleep() { 090 return sleep; 091 } 092 093 public static void setSleep(long sleep) { 094 WriteTimeoutFilter.sleep = sleep; 095 } 096 097 098 protected TimeStampStream getWriter() { 099 return next.narrow(TimeStampStream.class); 100 } 101 102 protected Socket getSocket() { 103 return next.narrow(Socket.class); 104 } 105 106 protected static void registerWrite(WriteTimeoutFilter filter) { 107 writers.add(filter); 108 } 109 110 protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) { 111 boolean result = writers.remove(filter); 112 if (result) { 113 if (fail) { 114 String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress(); 115 LOG.warn(message); 116 Socket sock = filter.getSocket(); 117 if (sock==null) { 118 LOG.error("Destination socket is null, unable to close socket.("+message+")"); 119 } else { 120 try { 121 sock.close(); 122 }catch (IOException ignore) { 123 } 124 } 125 } 126 } 127 return result; 128 } 129 130 @Override 131 public void start() throws Exception { 132 super.start(); 133 } 134 135 @Override 136 public void stop() throws Exception { 137 super.stop(); 138 } 139 140 protected static class TimeoutThread extends Thread { 141 static AtomicInteger instance = new AtomicInteger(0); 142 boolean run = true; 143 public TimeoutThread() { 144 setName("WriteTimeoutFilter-Timeout-"+instance.incrementAndGet()); 145 setDaemon(true); 146 setPriority(Thread.MIN_PRIORITY); 147 start(); 148 } 149 150 151 public void run() { 152 while (run) { 153 boolean error = false; 154 try { 155 if (!interrupted()) { 156 Iterator<WriteTimeoutFilter> filters = writers.iterator(); 157 while (run && filters.hasNext()) { 158 WriteTimeoutFilter filter = filters.next(); 159 if (filter.getWriteTimeout()<=0) continue; //no timeout set 160 long writeStart = filter.getWriter().getWriteTimestamp(); 161 long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1; 162 if (delta>filter.getWriteTimeout()) { 163 WriteTimeoutFilter.deRegisterWrite(filter, true,null); 164 }//if timeout 165 }//while 166 }//if interrupted 167 try { 168 Thread.sleep(getSleep()); 169 error = false; 170 } catch (InterruptedException x) { 171 //do nothing 172 } 173 }catch (Throwable t) { //make sure this thread never dies 174 if (!error) { //use error flag to avoid filling up the logs 175 LOG.error("WriteTimeout thread unable validate existing sockets.",t); 176 error = true; 177 } 178 } 179 } 180 } 181 } 182 183}