001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp; 018 019import java.io.IOException; 020import java.security.cert.X509Certificate; 021import java.util.concurrent.locks.ReentrantLock; 022 023import org.apache.activemq.broker.BrokerService; 024import org.apache.activemq.command.Command; 025import org.apache.activemq.transport.Transport; 026import org.apache.activemq.transport.TransportFilter; 027import org.apache.activemq.transport.TransportListener; 028import org.apache.activemq.transport.tcp.SslTransport; 029import org.apache.activemq.util.IOExceptionSupport; 030import org.apache.activemq.wireformat.WireFormat; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * The AMQPTransportFilter normally sits on top of a TcpTransport that has been 036 * configured with the AmqpWireFormat and is used to convert AMQP commands to 037 * ActiveMQ commands. All of the conversion work is done by delegating to the 038 * AMQPProtocolConverter 039 */ 040public class AmqpTransportFilter extends TransportFilter implements AmqpTransport { 041 private static final Logger LOG = LoggerFactory.getLogger(AmqpTransportFilter.class); 042 static final Logger TRACE_BYTES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".BYTES"); 043 public static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES"); 044 private AmqpProtocolConverter protocolConverter; 045 private AmqpWireFormat wireFormat; 046 private AmqpInactivityMonitor monitor; 047 048 private boolean trace; 049 private final ReentrantLock lock = new ReentrantLock(); 050 051 public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) { 052 super(next); 053 this.protocolConverter = new AmqpProtocolDiscriminator(this, brokerService); 054 if (wireFormat instanceof AmqpWireFormat) { 055 this.wireFormat = (AmqpWireFormat) wireFormat; 056 } 057 } 058 059 @Override 060 public void start() throws Exception { 061 if (monitor != null) { 062 monitor.setAmqpTransport(this); 063 monitor.startConnectionTimeoutChecker(getConnectAttemptTimeout()); 064 } 065 super.start(); 066 } 067 068 @Override 069 public void oneway(Object o) throws IOException { 070 try { 071 final Command command = (Command) o; 072 lock.lock(); 073 try { 074 protocolConverter.onActiveMQCommand(command); 075 } finally { 076 lock.unlock(); 077 } 078 } catch (Exception e) { 079 throw IOExceptionSupport.create(e); 080 } 081 } 082 083 @Override 084 public void onException(IOException error) { 085 lock.lock(); 086 try { 087 protocolConverter.onAMQPException(error); 088 } finally { 089 lock.unlock(); 090 } 091 } 092 093 @Override 094 public void sendToActiveMQ(IOException error) { 095 super.onException(error); 096 } 097 098 @Override 099 public void onCommand(Object command) { 100 try { 101 if (trace) { 102 TRACE_BYTES.trace("Received: \n{}", command); 103 } 104 lock.lock(); 105 try { 106 protocolConverter.onAMQPData(command); 107 } finally { 108 lock.unlock(); 109 } 110 } catch (IOException e) { 111 handleException(e); 112 } catch (Exception e) { 113 onException(IOExceptionSupport.create(e)); 114 } 115 } 116 117 @Override 118 public void sendToActiveMQ(Command command) { 119 assert lock.isHeldByCurrentThread(); 120 TransportListener l = transportListener; 121 if (l != null) { 122 l.onCommand(command); 123 } 124 } 125 126 @Override 127 public void sendToAmqp(Object command) throws IOException { 128 assert lock.isHeldByCurrentThread(); 129 if (trace) { 130 TRACE_BYTES.trace("Sending: \n{}", command); 131 } 132 Transport n = next; 133 if (n != null) { 134 n.oneway(command); 135 } 136 } 137 138 @Override 139 public long keepAlive() { 140 long nextKeepAliveDelay = 0l; 141 142 try { 143 lock.lock(); 144 try { 145 nextKeepAliveDelay = protocolConverter.keepAlive(); 146 } finally { 147 lock.unlock(); 148 } 149 } catch (IOException e) { 150 handleException(e); 151 } catch (Exception e) { 152 onException(IOExceptionSupport.create(e)); 153 } 154 155 return nextKeepAliveDelay; 156 } 157 158 @Override 159 public X509Certificate[] getPeerCertificates() { 160 if (next instanceof SslTransport) { 161 X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates(); 162 if (trace && peerCerts != null) { 163 LOG.debug("Peer Identity has been verified\n"); 164 } 165 return peerCerts; 166 } 167 return null; 168 } 169 170 @Override 171 public boolean isTrace() { 172 return trace; 173 } 174 175 public void setTrace(boolean trace) { 176 this.trace = trace; 177 this.protocolConverter.updateTracer(); 178 } 179 180 @Override 181 public AmqpWireFormat getWireFormat() { 182 return this.wireFormat; 183 } 184 185 public void handleException(IOException e) { 186 super.onException(e); 187 } 188 189 @Override 190 public String getTransformer() { 191 return wireFormat.getTransformer(); 192 } 193 194 public void setTransformer(String transformer) { 195 wireFormat.setTransformer(transformer); 196 } 197 198 @Override 199 public AmqpProtocolConverter getProtocolConverter() { 200 return protocolConverter; 201 } 202 203 @Override 204 public void setProtocolConverter(AmqpProtocolConverter protocolConverter) { 205 this.protocolConverter = protocolConverter; 206 } 207 208 public void setProducerCredit(int producerCredit) { 209 wireFormat.setProducerCredit(producerCredit); 210 } 211 212 public int getProducerCredit() { 213 return wireFormat.getProducerCredit(); 214 } 215 216 @Override 217 public void setInactivityMonitor(AmqpInactivityMonitor monitor) { 218 this.monitor = monitor; 219 } 220 221 @Override 222 public AmqpInactivityMonitor getInactivityMonitor() { 223 return monitor; 224 } 225 226 @Override 227 public boolean isUseInactivityMonitor() { 228 return monitor != null; 229 } 230 231 public int getConnectAttemptTimeout() { 232 return wireFormat.getConnectAttemptTimeout(); 233 } 234 235 public void setConnectAttemptTimeout(int connectAttemptTimeout) { 236 wireFormat.setConnectAttemptTimeout(connectAttemptTimeout); 237 } 238 239 public long getMaxFrameSize() { 240 return wireFormat.getMaxFrameSize(); 241 } 242 243 public void setMaxFrameSize(long maxFrameSize) { 244 wireFormat.setMaxFrameSize(maxFrameSize); 245 } 246}