001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport; 018 019import java.io.IOException; 020import java.io.InterruptedIOException; 021import java.net.Socket; 022import java.util.concurrent.CountDownLatch; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.atomic.AtomicBoolean; 025 026import org.apache.activemq.command.Command; 027import org.apache.activemq.command.WireFormatInfo; 028import org.apache.activemq.openwire.OpenWireFormat; 029import org.apache.activemq.util.IOExceptionSupport; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Negotiates the wire format with a new connection 035 */ 036public class WireFormatNegotiator extends TransportFilter { 037 038 private static final Logger LOG = LoggerFactory.getLogger(WireFormatNegotiator.class); 039 040 private OpenWireFormat wireFormat; 041 private final int minimumVersion; 042 private long negotiateTimeout = 15000L; 043 044 private final AtomicBoolean firstStart = new AtomicBoolean(true); 045 private final CountDownLatch readyCountDownLatch = new CountDownLatch(1); 046 private final CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1); 047 048 /** 049 * Negotiator 050 * 051 * @param next 052 */ 053 public WireFormatNegotiator(Transport next, OpenWireFormat wireFormat, int minimumVersion) { 054 super(next); 055 this.wireFormat = wireFormat; 056 if (minimumVersion <= 0) { 057 minimumVersion = 1; 058 } 059 this.minimumVersion = minimumVersion; 060 061 // Setup the initial negociation timeout to be the same as the inital max inactivity delay specified on the wireformat 062 // Does not make sense for us to take longer. 063 try { 064 if( wireFormat.getPreferedWireFormatInfo() !=null ) { 065 setNegotiateTimeout(wireFormat.getPreferedWireFormatInfo().getMaxInactivityDurationInitalDelay()); 066 } 067 } catch (IOException e) { 068 } 069 } 070 071 public void start() throws Exception { 072 super.start(); 073 if (firstStart.compareAndSet(true, false)) { 074 sendWireFormat(); 075 } 076 } 077 078 public void sendWireFormat() throws IOException { 079 try { 080 WireFormatInfo info = wireFormat.getPreferedWireFormatInfo(); 081 if (LOG.isDebugEnabled()) { 082 LOG.debug("Sending: " + info); 083 } 084 sendWireFormat(info); 085 } finally { 086 wireInfoSentDownLatch.countDown(); 087 } 088 } 089 090 public void stop() throws Exception { 091 super.stop(); 092 readyCountDownLatch.countDown(); 093 } 094 095 public void oneway(Object command) throws IOException { 096 boolean wasInterrupted = Thread.interrupted(); 097 try { 098 if (readyCountDownLatch.getCount() > 0 && !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) { 099 throw new IOException("Wire format negotiation timeout: peer did not send his wire format."); 100 } 101 } catch (InterruptedException e) { 102 InterruptedIOException interruptedIOException = new InterruptedIOException("Interrupted waiting for wire format negotiation"); 103 interruptedIOException.initCause(e); 104 try { 105 onException(interruptedIOException); 106 } finally { 107 Thread.currentThread().interrupt(); 108 wasInterrupted = false; 109 } 110 throw interruptedIOException; 111 } finally { 112 if (wasInterrupted) { 113 Thread.currentThread().interrupt(); 114 } 115 } 116 super.oneway(command); 117 } 118 119 public void onCommand(Object o) { 120 Command command = (Command)o; 121 if (command.isWireFormatInfo()) { 122 WireFormatInfo info = (WireFormatInfo)command; 123 negociate(info); 124 } 125 getTransportListener().onCommand(command); 126 } 127 128 public void negociate(WireFormatInfo info) { 129 if (LOG.isDebugEnabled()) { 130 LOG.debug("Received WireFormat: " + info); 131 } 132 133 try { 134 wireInfoSentDownLatch.await(); 135 136 if (LOG.isDebugEnabled()) { 137 LOG.debug(this + " before negotiation: " + wireFormat); 138 } 139 if (!info.isValid()) { 140 onException(new IOException("Remote wire format magic is invalid")); 141 } else if (info.getVersion() < minimumVersion) { 142 onException(new IOException("Remote wire format (" + info.getVersion() + ") is lower the minimum version required (" + minimumVersion + ")")); 143 } 144 145 wireFormat.renegotiateWireFormat(info); 146 Socket socket = next.narrow(Socket.class); 147 if (socket != null) { 148 socket.setTcpNoDelay(wireFormat.isTcpNoDelayEnabled()); 149 } 150 151 if (LOG.isDebugEnabled()) { 152 LOG.debug(this + " after negotiation: " + wireFormat); 153 } 154 155 } catch (IOException e) { 156 onException(e); 157 } catch (InterruptedException e) { 158 Thread.currentThread().interrupt(); 159 onException((IOException)new InterruptedIOException().initCause(e)); 160 } catch (Exception e) { 161 onException(IOExceptionSupport.create(e)); 162 } 163 readyCountDownLatch.countDown(); 164 onWireFormatNegotiated(info); 165 } 166 167 public void onException(IOException error) { 168 readyCountDownLatch.countDown(); 169 /* 170 * try { super.oneway(new ExceptionResponse(error)); } catch 171 * (IOException e) { // ignore as we are already throwing an exception } 172 */ 173 super.onException(error); 174 } 175 176 public String toString() { 177 return next.toString(); 178 } 179 180 protected void sendWireFormat(WireFormatInfo info) throws IOException { 181 next.oneway(info); 182 } 183 184 protected void onWireFormatNegotiated(WireFormatInfo info) { 185 } 186 187 public long getNegotiateTimeout() { 188 return negotiateTimeout; 189 } 190 191 public void setNegotiateTimeout(long negotiateTimeout) { 192 this.negotiateTimeout = negotiateTimeout; 193 } 194}