001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.protocol; 018 019import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; 020 021import java.io.IOException; 022 023import javax.jms.Destination; 024import javax.jms.ResourceAllocationException; 025 026import org.apache.activemq.command.ActiveMQDestination; 027import org.apache.activemq.command.ActiveMQMessage; 028import org.apache.activemq.command.ExceptionResponse; 029import org.apache.activemq.command.LocalTransactionId; 030import org.apache.activemq.command.MessageId; 031import org.apache.activemq.command.ProducerId; 032import org.apache.activemq.command.ProducerInfo; 033import org.apache.activemq.command.RemoveInfo; 034import org.apache.activemq.command.Response; 035import org.apache.activemq.command.TransactionId; 036import org.apache.activemq.transport.amqp.AmqpProtocolConverter; 037import org.apache.activemq.transport.amqp.ResponseHandler; 038import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer; 039import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer; 040import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor; 041import org.apache.activemq.transport.amqp.message.EncodedMessage; 042import org.apache.activemq.transport.amqp.message.InboundTransformer; 043import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer; 044import org.apache.activemq.util.LongSequenceGenerator; 045import org.apache.qpid.proton.amqp.Symbol; 046import org.apache.qpid.proton.amqp.messaging.Accepted; 047import org.apache.qpid.proton.amqp.messaging.Rejected; 048import org.apache.qpid.proton.amqp.transaction.TransactionalState; 049import org.apache.qpid.proton.amqp.transport.AmqpError; 050import org.apache.qpid.proton.amqp.transport.DeliveryState; 051import org.apache.qpid.proton.amqp.transport.ErrorCondition; 052import org.apache.qpid.proton.engine.Delivery; 053import org.apache.qpid.proton.engine.Receiver; 054import org.fusesource.hawtbuf.Buffer; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058/** 059 * An AmqpReceiver wraps the AMQP Receiver end of a link from the remote peer 060 * which holds the corresponding Sender which transfers message accross the 061 * link. The AmqpReceiver handles all incoming deliveries by converting them 062 * or wrapping them into an ActiveMQ message object and forwarding that message 063 * on to the appropriate ActiveMQ Destination. 064 */ 065public class AmqpReceiver extends AmqpAbstractReceiver { 066 067 private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class); 068 069 private final ProducerInfo producerInfo; 070 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 071 072 private InboundTransformer inboundTransformer; 073 074 /** 075 * Create a new instance of an AmqpReceiver 076 * 077 * @param session 078 * the Session that is the parent of this AmqpReceiver instance. 079 * @param endpoint 080 * the AMQP receiver endpoint that the class manages. 081 * @param producerInfo 082 * the ProducerInfo instance that contains this sender's configuration. 083 */ 084 public AmqpReceiver(AmqpSession session, Receiver endpoint, ProducerInfo producerInfo) { 085 super(session, endpoint); 086 087 this.producerInfo = producerInfo; 088 } 089 090 @Override 091 public void close() { 092 if (!isClosed() && isOpened()) { 093 sendToActiveMQ(new RemoveInfo(getProducerId())); 094 } 095 096 super.close(); 097 } 098 099 //----- Configuration accessors ------------------------------------------// 100 101 /** 102 * @return the ActiveMQ ProducerId used to register this Receiver on the Broker. 103 */ 104 public ProducerId getProducerId() { 105 return producerInfo.getProducerId(); 106 } 107 108 @Override 109 public ActiveMQDestination getDestination() { 110 return producerInfo.getDestination(); 111 } 112 113 @Override 114 public void setDestination(ActiveMQDestination destination) { 115 producerInfo.setDestination(destination); 116 } 117 118 /** 119 * If the Sender that initiated this Receiver endpoint did not define an address 120 * then it is using anonymous mode and message are to be routed to the address 121 * that is defined in the AMQP message 'To' field. 122 * 123 * @return true if this Receiver should operate in anonymous mode. 124 */ 125 public boolean isAnonymous() { 126 return producerInfo.getDestination() == null; 127 } 128 129 //----- Internal Implementation ------------------------------------------// 130 131 protected InboundTransformer getTransformer() { 132 if (inboundTransformer == null) { 133 String transformer = session.getConnection().getConfiguredTransformer(); 134 if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_JMS)) { 135 inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE); 136 } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_NATIVE)) { 137 inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); 138 } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_RAW)) { 139 inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE); 140 } else { 141 LOG.warn("Unknown transformer type {} using native one instead", transformer); 142 inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); 143 } 144 } 145 return inboundTransformer; 146 } 147 148 @Override 149 protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception { 150 if (!isClosed()) { 151 EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length); 152 153 InboundTransformer transformer = getTransformer(); 154 ActiveMQMessage message = null; 155 156 while (transformer != null) { 157 try { 158 message = (ActiveMQMessage) transformer.transform(em); 159 break; 160 } catch (Exception e) { 161 LOG.debug("Transform of message using [{}] transformer, failed", getTransformer().getTransformerName()); 162 LOG.trace("Transformation error:", e); 163 164 transformer = transformer.getFallbackTransformer(); 165 } 166 } 167 168 if (message == null) { 169 throw new IOException("Failed to transform incoming delivery, skipping."); 170 } 171 172 current = null; 173 174 if (isAnonymous()) { 175 Destination toDestination = message.getJMSDestination(); 176 if (toDestination == null || !(toDestination instanceof ActiveMQDestination)) { 177 Rejected rejected = new Rejected(); 178 ErrorCondition condition = new ErrorCondition(); 179 condition.setCondition(Symbol.valueOf("failed")); 180 condition.setDescription("Missing to field for message sent to an anonymous producer"); 181 rejected.setError(condition); 182 delivery.disposition(rejected); 183 return; 184 } 185 } else { 186 message.setJMSDestination(getDestination()); 187 } 188 189 message.setProducerId(getProducerId()); 190 191 // Always override the AMQP client's MessageId with our own. Preserve 192 // the original in the TextView property for later Ack. 193 MessageId messageId = new MessageId(getProducerId(), messageIdGenerator.getNextSequenceId()); 194 195 MessageId amqpMessageId = message.getMessageId(); 196 if (amqpMessageId != null) { 197 if (amqpMessageId.getTextView() != null) { 198 messageId.setTextView(amqpMessageId.getTextView()); 199 } else { 200 messageId.setTextView(amqpMessageId.toString()); 201 } 202 } 203 204 message.setMessageId(messageId); 205 206 LOG.trace("Inbound Message:{} from Producer:{}", 207 message.getMessageId(), getProducerId() + ":" + messageId.getProducerSequenceId()); 208 209 final DeliveryState remoteState = delivery.getRemoteState(); 210 if (remoteState != null && remoteState instanceof TransactionalState) { 211 TransactionalState txState = (TransactionalState) remoteState; 212 TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId())); 213 session.enlist(txId); 214 message.setTransactionId(txId); 215 } 216 217 message.onSend(); 218 if (!delivery.remotelySettled()) { 219 sendToActiveMQ(message, new ResponseHandler() { 220 221 @Override 222 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 223 if (response.isException()) { 224 ExceptionResponse error = (ExceptionResponse) response; 225 Rejected rejected = new Rejected(); 226 ErrorCondition condition = new ErrorCondition(); 227 228 if (error.getException() instanceof SecurityException) { 229 condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS); 230 } else if (error.getException() instanceof ResourceAllocationException) { 231 condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED); 232 } else { 233 condition.setCondition(Symbol.valueOf("failed")); 234 } 235 236 condition.setDescription(error.getException().getMessage()); 237 rejected.setError(condition); 238 delivery.disposition(rejected); 239 } else { 240 if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .3)) { 241 LOG.debug("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId()); 242 getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit()); 243 } 244 245 if (remoteState != null && remoteState instanceof TransactionalState) { 246 TransactionalState txAccepted = new TransactionalState(); 247 txAccepted.setOutcome(Accepted.getInstance()); 248 txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId()); 249 250 delivery.disposition(txAccepted); 251 } else { 252 delivery.disposition(Accepted.getInstance()); 253 } 254 } 255 256 delivery.settle(); 257 session.pumpProtonToSocket(); 258 } 259 }); 260 } else { 261 if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .3)) { 262 LOG.debug("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId()); 263 getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit()); 264 session.pumpProtonToSocket(); 265 } 266 267 delivery.settle(); 268 sendToActiveMQ(message); 269 } 270 } 271 } 272}