001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
020
021import java.io.IOException;
022import java.util.LinkedList;
023
024import org.apache.activemq.command.ActiveMQDestination;
025import org.apache.activemq.command.ActiveMQMessage;
026import org.apache.activemq.command.ConsumerControl;
027import org.apache.activemq.command.ConsumerId;
028import org.apache.activemq.command.ConsumerInfo;
029import org.apache.activemq.command.ExceptionResponse;
030import org.apache.activemq.command.LocalTransactionId;
031import org.apache.activemq.command.MessageAck;
032import org.apache.activemq.command.MessageDispatch;
033import org.apache.activemq.command.MessagePull;
034import org.apache.activemq.command.RemoveInfo;
035import org.apache.activemq.command.RemoveSubscriptionInfo;
036import org.apache.activemq.command.Response;
037import org.apache.activemq.command.TransactionId;
038import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
039import org.apache.activemq.transport.amqp.ResponseHandler;
040import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
041import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
042import org.apache.activemq.transport.amqp.message.EncodedMessage;
043import org.apache.activemq.transport.amqp.message.OutboundTransformer;
044import org.apache.qpid.proton.amqp.messaging.Accepted;
045import org.apache.qpid.proton.amqp.messaging.Modified;
046import org.apache.qpid.proton.amqp.messaging.Outcome;
047import org.apache.qpid.proton.amqp.messaging.Rejected;
048import org.apache.qpid.proton.amqp.messaging.Released;
049import org.apache.qpid.proton.amqp.transaction.TransactionalState;
050import org.apache.qpid.proton.amqp.transport.AmqpError;
051import org.apache.qpid.proton.amqp.transport.DeliveryState;
052import org.apache.qpid.proton.amqp.transport.ErrorCondition;
053import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
054import org.apache.qpid.proton.engine.Delivery;
055import org.apache.qpid.proton.engine.Sender;
056import org.fusesource.hawtbuf.Buffer;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060/**
061 * An AmqpSender wraps the AMQP Sender end of a link from the remote peer
062 * which holds the corresponding Receiver which receives messages transfered
063 * across the link from the Broker.
064 *
065 * An AmqpSender is in turn a message consumer subscribed to some destination
066 * on the broker.  As messages are dispatched to this sender that are sent on
067 * to the remote Receiver end of the lin.
068 */
069public class AmqpSender extends AmqpAbstractLink<Sender> {
070
071    private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class);
072
073    private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
074
075    private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
076    private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
077    private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
078    private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
079    private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
080
081    private final ConsumerInfo consumerInfo;
082    private final boolean presettle;
083
084    private boolean draining;
085    private long lastDeliveredSequenceId;
086
087    private Buffer currentBuffer;
088    private Delivery currentDelivery;
089
090    /**
091     * Creates a new AmqpSender instance that manages the given Sender
092     *
093     * @param session
094     *        the AmqpSession object that is the parent of this instance.
095     * @param endpoint
096     *        the AMQP Sender instance that this class manages.
097     * @param consumerInfo
098     *        the ConsumerInfo instance that holds configuration for this sender.
099     */
100    public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) {
101        super(session, endpoint);
102
103        this.consumerInfo = consumerInfo;
104        this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
105    }
106
107    @Override
108    public void open() {
109        if (!isClosed()) {
110            session.registerSender(getConsumerId(), this);
111        }
112
113        super.open();
114    }
115
116    @Override
117    public void detach() {
118        if (!isClosed() && isOpened()) {
119            RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
120            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
121            sendToActiveMQ(removeCommand);
122
123            session.unregisterSender(getConsumerId());
124        }
125
126        super.detach();
127    }
128
129    @Override
130    public void close() {
131        if (!isClosed() && isOpened()) {
132            RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
133            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
134            sendToActiveMQ(removeCommand);
135
136            if (consumerInfo.isDurable()) {
137                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
138                rsi.setConnectionId(session.getConnection().getConnectionId());
139                rsi.setSubscriptionName(getEndpoint().getName());
140                rsi.setClientId(session.getConnection().getClientId());
141
142                sendToActiveMQ(rsi);
143            }
144
145            session.unregisterSender(getConsumerId());
146        }
147
148        super.close();
149    }
150
151    @Override
152    public void flow() throws Exception {
153        if (LOG.isTraceEnabled()) {
154            LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}",
155                      draining, getEndpoint().getDrain(),
156                      getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
157        }
158
159        if (getEndpoint().getDrain() && !draining) {
160
161            // Revert to a pull consumer.
162            ConsumerControl control = new ConsumerControl();
163            control.setConsumerId(getConsumerId());
164            control.setDestination(getDestination());
165            control.setPrefetch(0);
166
167            LOG.trace("Flow: Pull case -> consumer control with prefetch (0) to control output");
168
169            sendToActiveMQ(control);
170
171            if (endpoint.getCredit() > 0) {
172                draining = true;
173
174                // Now request dispatch of the drain amount, we request immediate
175                // timeout and an completion message regardless so that we can know
176                // when we should marked the link as drained.
177                MessagePull pullRequest = new MessagePull();
178                pullRequest.setConsumerId(getConsumerId());
179                pullRequest.setDestination(getDestination());
180                pullRequest.setTimeout(-1);
181                pullRequest.setAlwaysSignalDone(true);
182                pullRequest.setQuantity(endpoint.getCredit());
183
184                LOG.trace("Pull case -> consumer pull request quantity = {}", endpoint.getCredit());
185
186                sendToActiveMQ(pullRequest);
187            } else {
188                LOG.trace("Pull case -> sending any Queued messages and marking drained");
189
190                pumpOutbound();
191                getEndpoint().drained();
192                session.pumpProtonToSocket();
193            }
194        } else {
195            ConsumerControl control = new ConsumerControl();
196            control.setConsumerId(getConsumerId());
197            control.setDestination(getDestination());
198            control.setPrefetch(getEndpoint().getCredit());
199
200            LOG.trace("Flow: update -> consumer control with prefetch {}", control.getPrefetch());
201
202            sendToActiveMQ(control);
203        }
204    }
205
206    @Override
207    public void delivery(Delivery delivery) throws Exception {
208        MessageDispatch md = (MessageDispatch) delivery.getContext();
209        DeliveryState state = delivery.getRemoteState();
210
211        if (state instanceof TransactionalState) {
212            TransactionalState txState = (TransactionalState) state;
213            LOG.trace("onDelivery: TX delivery state = {}", state);
214            if (txState.getOutcome() != null) {
215                Outcome outcome = txState.getOutcome();
216                if (outcome instanceof Accepted) {
217                    if (!delivery.remotelySettled()) {
218                        TransactionalState txAccepted = new TransactionalState();
219                        txAccepted.setOutcome(Accepted.getInstance());
220                        txAccepted.setTxnId(((TransactionalState) state).getTxnId());
221
222                        delivery.disposition(txAccepted);
223                    }
224                    settle(delivery, MessageAck.DELIVERED_ACK_TYPE);
225                }
226            }
227        } else {
228            if (state instanceof Accepted) {
229                LOG.trace("onDelivery: accepted state = {}", state);
230                if (!delivery.remotelySettled()) {
231                    delivery.disposition(new Accepted());
232                }
233                settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
234            } else if (state instanceof Rejected) {
235                // re-deliver /w incremented delivery counter.
236                md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
237                LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter());
238                settle(delivery, -1);
239            } else if (state instanceof Released) {
240                LOG.trace("onDelivery: Released state = {}", state);
241                // re-deliver && don't increment the counter.
242                settle(delivery, -1);
243            } else if (state instanceof Modified) {
244                Modified modified = (Modified) state;
245                if (Boolean.TRUE.equals(modified.getDeliveryFailed())) {
246                    // increment delivery counter..
247                    md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
248                }
249                LOG.trace("onDelivery: Modified state = {}, delivery count now {}", state, md.getRedeliveryCounter());
250                byte ackType = -1;
251                Boolean undeliverableHere = modified.getUndeliverableHere();
252                if (undeliverableHere != null && undeliverableHere) {
253                    // receiver does not want the message..
254                    // perhaps we should DLQ it?
255                    ackType = MessageAck.POSION_ACK_TYPE;
256                }
257                settle(delivery, ackType);
258            }
259        }
260
261        pumpOutbound();
262    }
263
264    @Override
265    public void commit() throws Exception {
266        if (!dispatchedInTx.isEmpty()) {
267            for (MessageDispatch md : dispatchedInTx) {
268                MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
269                pendingTxAck.setFirstMessageId(md.getMessage().getMessageId());
270                pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
271
272                LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
273
274                sendToActiveMQ(pendingTxAck, new ResponseHandler() {
275                    @Override
276                    public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
277                        if (response.isException()) {
278                            Throwable exception = ((ExceptionResponse) response).getException();
279                            exception.printStackTrace();
280                            getEndpoint().close();
281                        }
282                        session.pumpProtonToSocket();
283                    }
284                });
285            }
286
287            dispatchedInTx.clear();
288        }
289    }
290
291    @Override
292    public void rollback() throws Exception {
293        synchronized (outbound) {
294
295            LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size());
296
297            for (MessageDispatch dispatch : dispatchedInTx) {
298                dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1);
299                dispatch.getMessage().setTransactionId(null);
300                outbound.addFirst(dispatch);
301            }
302
303            dispatchedInTx.clear();
304        }
305    }
306
307    /**
308     * Event point for incoming message from ActiveMQ on this Sender's
309     * corresponding subscription.
310     *
311     * @param dispatch
312     *        the MessageDispatch to process and send across the link.
313     *
314     * @throws Exception if an error occurs while encoding the message for send.
315     */
316    public void onMessageDispatch(MessageDispatch dispatch) throws Exception {
317        if (!isClosed()) {
318            // Lock to prevent stepping on TX redelivery
319            synchronized (outbound) {
320                outbound.addLast(dispatch);
321            }
322            pumpOutbound();
323            session.pumpProtonToSocket();
324        }
325    }
326
327    /**
328     * Called when the Broker sends a ConsumerControl command to the Consumer that
329     * this sender creates to obtain messages to dispatch via the sender for this
330     * end of the open link.
331     *
332     * @param control
333     *        The ConsumerControl command to process.
334     */
335    public void onConsumerControl(ConsumerControl control) {
336        if (control.isClose()) {
337            close(new ErrorCondition(AmqpError.INTERNAL_ERROR, "Receiver forcably closed"));
338            session.pumpProtonToSocket();
339        }
340    }
341
342    @Override
343    public String toString() {
344        return "AmqpSender {" + getConsumerId() + "}";
345    }
346
347    //----- Property getters and setters -------------------------------------//
348
349    public ConsumerId getConsumerId() {
350        return consumerInfo.getConsumerId();
351    }
352
353    @Override
354    public ActiveMQDestination getDestination() {
355        return consumerInfo.getDestination();
356    }
357
358    @Override
359    public void setDestination(ActiveMQDestination destination) {
360        consumerInfo.setDestination(destination);
361    }
362
363    //----- Internal Implementation ------------------------------------------//
364
365    public void pumpOutbound() throws Exception {
366        while (!isClosed()) {
367            while (currentBuffer != null) {
368                int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
369                if (sent > 0) {
370                    currentBuffer.moveHead(sent);
371                    if (currentBuffer.length == 0) {
372                        if (presettle) {
373                            settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE);
374                        } else {
375                            getEndpoint().advance();
376                        }
377                        currentBuffer = null;
378                        currentDelivery = null;
379                    }
380                } else {
381                    return;
382                }
383            }
384
385            if (outbound.isEmpty()) {
386                return;
387            }
388
389            final MessageDispatch md = outbound.removeFirst();
390            try {
391
392                ActiveMQMessage temp = null;
393                if (md.getMessage() != null) {
394
395                    // Topics can dispatch the same Message to more than one consumer
396                    // so we must copy to prevent concurrent read / write to the same
397                    // message object.
398                    if (md.getDestination().isTopic()) {
399                        synchronized (md.getMessage()) {
400                            temp = (ActiveMQMessage) md.getMessage().copy();
401                        }
402                    } else {
403                        temp = (ActiveMQMessage) md.getMessage();
404                    }
405
406                    if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
407                        temp.setProperty(MESSAGE_FORMAT_KEY, 0);
408                    }
409                }
410
411                final ActiveMQMessage jms = temp;
412                if (jms == null) {
413                    LOG.trace("Sender:[{}] browse done.", getEndpoint().getName());
414                    // It's the end of browse signal in response to a MessagePull
415                    getEndpoint().drained();
416                    draining = false;
417                } else {
418                    if (LOG.isTraceEnabled()) {
419                        LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}",
420                                  getEndpoint().getName(), jms.getJMSMessageID(), draining, getEndpoint().getDrain(),
421                                  getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
422                    }
423
424                    if (draining && getEndpoint().getCredit() == 0) {
425                        LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName());
426                        getEndpoint().drained();
427                        draining = false;
428                    }
429
430                    jms.setRedeliveryCounter(md.getRedeliveryCounter());
431                    jms.setReadOnlyBody(true);
432                    final EncodedMessage amqp = outboundTransformer.transform(jms);
433                    if (amqp != null && amqp.getLength() > 0) {
434                        currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
435                        if (presettle) {
436                            currentDelivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
437                        } else {
438                            final byte[] tag = tagCache.getNextTag();
439                            currentDelivery = getEndpoint().delivery(tag, 0, tag.length);
440                        }
441                        currentDelivery.setContext(md);
442                    } else {
443                        // TODO: message could not be generated what now?
444                    }
445                }
446            } catch (Exception e) {
447                LOG.warn("Error detected while flushing outbound messages: {}", e.getMessage());
448            }
449        }
450    }
451
452    private void settle(final Delivery delivery, final int ackType) throws Exception {
453        byte[] tag = delivery.getTag();
454        if (tag != null && tag.length > 0 && delivery.remotelySettled()) {
455            tagCache.returnTag(tag);
456        }
457
458        int newCredit = Math.max(0, getEndpoint().getCredit() - 1);
459        LOG.trace("Sender:[{}] updating conumser prefetch:{} after delivery settled.",
460                  getEndpoint().getName(), newCredit);
461
462        ConsumerControl control = new ConsumerControl();
463        control.setConsumerId(getConsumerId());
464        control.setDestination(getDestination());
465        control.setPrefetch(newCredit);
466
467        sendToActiveMQ(control);
468
469        if (ackType == -1) {
470            // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
471            delivery.settle();
472            onMessageDispatch((MessageDispatch) delivery.getContext());
473        } else {
474            MessageDispatch md = (MessageDispatch) delivery.getContext();
475            lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
476            MessageAck ack = new MessageAck();
477            ack.setConsumerId(getConsumerId());
478            ack.setFirstMessageId(md.getMessage().getMessageId());
479            ack.setLastMessageId(md.getMessage().getMessageId());
480            ack.setMessageCount(1);
481            ack.setAckType((byte) ackType);
482            ack.setDestination(md.getDestination());
483
484            DeliveryState remoteState = delivery.getRemoteState();
485            if (remoteState != null && remoteState instanceof TransactionalState) {
486                TransactionalState txState = (TransactionalState) remoteState;
487                TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId()));
488                ack.setTransactionId(txId);
489
490                // Store the message sent in this TX we might need to re-send on rollback
491                session.enlist(txId);
492                md.getMessage().setTransactionId(txId);
493                dispatchedInTx.addFirst(md);
494            }
495
496            LOG.trace("Sending Ack to ActiveMQ: {}", ack);
497
498            sendToActiveMQ(ack, new ResponseHandler() {
499                @Override
500                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
501                    if (response.isException()) {
502                        if (response.isException()) {
503                            Throwable exception = ((ExceptionResponse) response).getException();
504                            exception.printStackTrace();
505                            getEndpoint().close();
506                        }
507                    } else {
508                        delivery.settle();
509                    }
510                    session.pumpProtonToSocket();
511                }
512            });
513        }
514    }
515}