001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
020
021import java.io.IOException;
022
023import javax.jms.Destination;
024import javax.jms.ResourceAllocationException;
025
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.ActiveMQMessage;
028import org.apache.activemq.command.ExceptionResponse;
029import org.apache.activemq.command.LocalTransactionId;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.command.ProducerId;
032import org.apache.activemq.command.ProducerInfo;
033import org.apache.activemq.command.RemoveInfo;
034import org.apache.activemq.command.Response;
035import org.apache.activemq.command.TransactionId;
036import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
037import org.apache.activemq.transport.amqp.ResponseHandler;
038import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer;
039import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer;
040import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
041import org.apache.activemq.transport.amqp.message.EncodedMessage;
042import org.apache.activemq.transport.amqp.message.InboundTransformer;
043import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer;
044import org.apache.activemq.util.LongSequenceGenerator;
045import org.apache.qpid.proton.amqp.Symbol;
046import org.apache.qpid.proton.amqp.messaging.Accepted;
047import org.apache.qpid.proton.amqp.messaging.Rejected;
048import org.apache.qpid.proton.amqp.transaction.TransactionalState;
049import org.apache.qpid.proton.amqp.transport.AmqpError;
050import org.apache.qpid.proton.amqp.transport.DeliveryState;
051import org.apache.qpid.proton.amqp.transport.ErrorCondition;
052import org.apache.qpid.proton.engine.Delivery;
053import org.apache.qpid.proton.engine.Receiver;
054import org.fusesource.hawtbuf.Buffer;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * An AmqpReceiver wraps the AMQP Receiver end of a link from the remote peer
060 * which holds the corresponding Sender which transfers message accross the
061 * link.  The AmqpReceiver handles all incoming deliveries by converting them
062 * or wrapping them into an ActiveMQ message object and forwarding that message
063 * on to the appropriate ActiveMQ Destination.
064 */
065public class AmqpReceiver extends AmqpAbstractReceiver {
066
067    private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class);
068
069    private final ProducerInfo producerInfo;
070    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
071
072    private InboundTransformer inboundTransformer;
073
074    /**
075     * Create a new instance of an AmqpReceiver
076     *
077     * @param session
078     *        the Session that is the parent of this AmqpReceiver instance.
079     * @param endpoint
080     *        the AMQP receiver endpoint that the class manages.
081     * @param producerInfo
082     *        the ProducerInfo instance that contains this sender's configuration.
083     */
084    public AmqpReceiver(AmqpSession session, Receiver endpoint, ProducerInfo producerInfo) {
085        super(session, endpoint);
086
087        this.producerInfo = producerInfo;
088    }
089
090    @Override
091    public void close() {
092        if (!isClosed() && isOpened()) {
093            sendToActiveMQ(new RemoveInfo(getProducerId()));
094        }
095
096        super.close();
097    }
098
099    //----- Configuration accessors ------------------------------------------//
100
101    /**
102     * @return the ActiveMQ ProducerId used to register this Receiver on the Broker.
103     */
104    public ProducerId getProducerId() {
105        return producerInfo.getProducerId();
106    }
107
108    @Override
109    public ActiveMQDestination getDestination() {
110        return producerInfo.getDestination();
111    }
112
113    @Override
114    public void setDestination(ActiveMQDestination destination) {
115        producerInfo.setDestination(destination);
116    }
117
118    /**
119     * If the Sender that initiated this Receiver endpoint did not define an address
120     * then it is using anonymous mode and message are to be routed to the address
121     * that is defined in the AMQP message 'To' field.
122     *
123     * @return true if this Receiver should operate in anonymous mode.
124     */
125    public boolean isAnonymous() {
126        return producerInfo.getDestination() == null;
127    }
128
129    //----- Internal Implementation ------------------------------------------//
130
131    protected InboundTransformer getTransformer() {
132        if (inboundTransformer == null) {
133            String transformer = session.getConnection().getConfiguredTransformer();
134            if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_JMS)) {
135                inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
136            } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_NATIVE)) {
137                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
138            } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_RAW)) {
139                inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE);
140            } else {
141                LOG.warn("Unknown transformer type {} using native one instead", transformer);
142                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
143            }
144        }
145        return inboundTransformer;
146    }
147
148    @Override
149    protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception {
150        if (!isClosed()) {
151            EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length);
152
153            InboundTransformer transformer = getTransformer();
154            ActiveMQMessage message = null;
155
156            while (transformer != null) {
157                try {
158                    message = (ActiveMQMessage) transformer.transform(em);
159                    break;
160                } catch (Exception e) {
161                    LOG.debug("Transform of message using [{}] transformer, failed", getTransformer().getTransformerName());
162                    LOG.trace("Transformation error:", e);
163
164                    transformer = transformer.getFallbackTransformer();
165                }
166            }
167
168            if (message == null) {
169                throw new IOException("Failed to transform incoming delivery, skipping.");
170            }
171
172            current = null;
173
174            if (isAnonymous()) {
175                Destination toDestination = message.getJMSDestination();
176                if (toDestination == null || !(toDestination instanceof ActiveMQDestination)) {
177                    Rejected rejected = new Rejected();
178                    ErrorCondition condition = new ErrorCondition();
179                    condition.setCondition(Symbol.valueOf("failed"));
180                    condition.setDescription("Missing to field for message sent to an anonymous producer");
181                    rejected.setError(condition);
182                    delivery.disposition(rejected);
183                    return;
184                }
185            } else {
186                message.setJMSDestination(getDestination());
187            }
188
189            message.setProducerId(getProducerId());
190
191            // Always override the AMQP client's MessageId with our own.  Preserve
192            // the original in the TextView property for later Ack.
193            MessageId messageId = new MessageId(getProducerId(), messageIdGenerator.getNextSequenceId());
194
195            MessageId amqpMessageId = message.getMessageId();
196            if (amqpMessageId != null) {
197                if (amqpMessageId.getTextView() != null) {
198                    messageId.setTextView(amqpMessageId.getTextView());
199                } else {
200                    messageId.setTextView(amqpMessageId.toString());
201                }
202            }
203
204            message.setMessageId(messageId);
205
206            LOG.trace("Inbound Message:{} from Producer:{}",
207                      message.getMessageId(), getProducerId() + ":" + messageId.getProducerSequenceId());
208
209            final DeliveryState remoteState = delivery.getRemoteState();
210            if (remoteState != null && remoteState instanceof TransactionalState) {
211                TransactionalState txState = (TransactionalState) remoteState;
212                TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId()));
213                session.enlist(txId);
214                message.setTransactionId(txId);
215            }
216
217            message.onSend();
218            if (!delivery.remotelySettled()) {
219                sendToActiveMQ(message, new ResponseHandler() {
220
221                    @Override
222                    public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
223                        if (response.isException()) {
224                            ExceptionResponse error = (ExceptionResponse) response;
225                            Rejected rejected = new Rejected();
226                            ErrorCondition condition = new ErrorCondition();
227
228                            if (error.getException() instanceof SecurityException) {
229                                condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
230                            } else if (error.getException() instanceof ResourceAllocationException) {
231                                condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
232                            } else {
233                                condition.setCondition(Symbol.valueOf("failed"));
234                            }
235
236                            condition.setDescription(error.getException().getMessage());
237                            rejected.setError(condition);
238                            delivery.disposition(rejected);
239                        } else {
240                            if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .3)) {
241                                LOG.debug("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId());
242                                getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
243                            }
244
245                            if (remoteState != null && remoteState instanceof TransactionalState) {
246                                TransactionalState txAccepted = new TransactionalState();
247                                txAccepted.setOutcome(Accepted.getInstance());
248                                txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId());
249
250                                delivery.disposition(txAccepted);
251                            } else {
252                                delivery.disposition(Accepted.getInstance());
253                            }
254                        }
255
256                        delivery.settle();
257                        session.pumpProtonToSocket();
258                    }
259                });
260            } else {
261                if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .3)) {
262                    LOG.debug("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId());
263                    getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
264                    session.pumpProtonToSocket();
265                }
266
267                delivery.settle();
268                sendToActiveMQ(message);
269            }
270        }
271    }
272}