001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
020
021import java.io.IOException;
022import java.util.LinkedList;
023import java.util.concurrent.atomic.AtomicInteger;
024
025import org.apache.activemq.broker.region.AbstractSubscription;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.ActiveMQMessage;
028import org.apache.activemq.command.ConsumerControl;
029import org.apache.activemq.command.ConsumerId;
030import org.apache.activemq.command.ConsumerInfo;
031import org.apache.activemq.command.ExceptionResponse;
032import org.apache.activemq.command.LocalTransactionId;
033import org.apache.activemq.command.MessageAck;
034import org.apache.activemq.command.MessageDispatch;
035import org.apache.activemq.command.MessagePull;
036import org.apache.activemq.command.RemoveInfo;
037import org.apache.activemq.command.RemoveSubscriptionInfo;
038import org.apache.activemq.command.Response;
039import org.apache.activemq.command.TransactionId;
040import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
041import org.apache.activemq.transport.amqp.ResponseHandler;
042import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
043import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
044import org.apache.activemq.transport.amqp.message.EncodedMessage;
045import org.apache.activemq.transport.amqp.message.OutboundTransformer;
046import org.apache.qpid.proton.amqp.messaging.Accepted;
047import org.apache.qpid.proton.amqp.messaging.Modified;
048import org.apache.qpid.proton.amqp.messaging.Outcome;
049import org.apache.qpid.proton.amqp.messaging.Rejected;
050import org.apache.qpid.proton.amqp.messaging.Released;
051import org.apache.qpid.proton.amqp.transaction.TransactionalState;
052import org.apache.qpid.proton.amqp.transport.AmqpError;
053import org.apache.qpid.proton.amqp.transport.DeliveryState;
054import org.apache.qpid.proton.amqp.transport.ErrorCondition;
055import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
056import org.apache.qpid.proton.engine.Delivery;
057import org.apache.qpid.proton.engine.Link;
058import org.apache.qpid.proton.engine.Sender;
059import org.fusesource.hawtbuf.Buffer;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * An AmqpSender wraps the AMQP Sender end of a link from the remote peer
065 * which holds the corresponding Receiver which receives messages transfered
066 * across the link from the Broker.
067 *
068 * An AmqpSender is in turn a message consumer subscribed to some destination
069 * on the broker.  As messages are dispatched to this sender that are sent on
070 * to the remote Receiver end of the lin.
071 */
072public class AmqpSender extends AmqpAbstractLink<Sender> {
073
074    private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class);
075
076    private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
077
078    private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
079    private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
080    private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
081    private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
082    private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
083
084    private final ConsumerInfo consumerInfo;
085    private AbstractSubscription subscription;
086    private AtomicInteger prefetchExtension;
087    private int currentCreditRequest;
088    private int logicalDeliveryCount; // echoes prefetch extension but from protons perspective
089    private final boolean presettle;
090
091    private boolean draining;
092    private long lastDeliveredSequenceId;
093
094    private Buffer currentBuffer;
095    private Delivery currentDelivery;
096
097    /**
098     * Creates a new AmqpSender instance that manages the given Sender
099     *
100     * @param session
101     *        the AmqpSession object that is the parent of this instance.
102     * @param endpoint
103     *        the AMQP Sender instance that this class manages.
104     * @param consumerInfo
105     *        the ConsumerInfo instance that holds configuration for this sender.
106     */
107    public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) {
108        super(session, endpoint);
109
110        this.consumerInfo = consumerInfo;
111        this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
112    }
113
114    @Override
115    public void open() {
116        if (!isClosed()) {
117            session.registerSender(getConsumerId(), this);
118            subscription = (AbstractSubscription)session.getConnection().lookupPrefetchSubscription(consumerInfo);
119            prefetchExtension = subscription.getPrefetchExtension();
120        }
121
122        super.open();
123    }
124
125    @Override
126    public void detach() {
127        if (!isClosed() && isOpened()) {
128            RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
129            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
130            sendToActiveMQ(removeCommand);
131
132            session.unregisterSender(getConsumerId());
133        }
134
135        super.detach();
136    }
137
138    @Override
139    public void close() {
140        if (!isClosed() && isOpened()) {
141            RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
142            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
143            sendToActiveMQ(removeCommand);
144
145            if (consumerInfo.isDurable()) {
146                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
147                rsi.setConnectionId(session.getConnection().getConnectionId());
148                rsi.setSubscriptionName(getEndpoint().getName());
149                rsi.setClientId(session.getConnection().getClientId());
150
151                sendToActiveMQ(rsi);
152            }
153
154            session.unregisterSender(getConsumerId());
155        }
156
157        super.close();
158    }
159
160    @Override
161    public void flow() throws Exception {
162        Link endpoint = getEndpoint();
163        if (LOG.isTraceEnabled()) {
164            LOG.trace("Flow: draining={}, drain={} credit={}, currentCredit={}, senderDeliveryCount={} - Sub={}",
165                    draining, endpoint.getDrain(),
166                    endpoint.getCredit(), currentCreditRequest, logicalDeliveryCount, subscription);
167        }
168
169        final int endpointCredit = endpoint.getCredit();
170        if (endpoint.getDrain() && !draining) {
171
172            if (endpointCredit > 0) {
173                draining = true;
174
175                // Now request dispatch of the drain amount, we request immediate
176                // timeout and an completion message regardless so that we can know
177                // when we should marked the link as drained.
178                MessagePull pullRequest = new MessagePull();
179                pullRequest.setConsumerId(getConsumerId());
180                pullRequest.setDestination(getDestination());
181                pullRequest.setTimeout(-1);
182                pullRequest.setAlwaysSignalDone(true);
183                pullRequest.setQuantity(endpointCredit);
184
185                LOG.trace("Pull case -> consumer pull request quantity = {}", endpointCredit);
186
187                sendToActiveMQ(pullRequest);
188            } else {
189                LOG.trace("Pull case -> sending any Queued messages and marking drained");
190
191                pumpOutbound();
192                getEndpoint().drained();
193                session.pumpProtonToSocket();
194                currentCreditRequest = 0;
195                logicalDeliveryCount = 0;
196            }
197        } else if (endpointCredit >= 0) {
198
199            if (endpointCredit == 0 && currentCreditRequest != 0) {
200
201                prefetchExtension.set(0);
202                currentCreditRequest = 0;
203                logicalDeliveryCount = 0;
204                LOG.trace("Flow: credit 0 for sub:" + subscription);
205
206            } else {
207
208                int deltaToAdd = endpointCredit;
209                int logicalCredit = currentCreditRequest - logicalDeliveryCount;
210                if (logicalCredit > 0) {
211                    deltaToAdd -= logicalCredit;
212                } else {
213                    // reset delivery counter - dispatch from broker concurrent with credit=0 flow can go negative
214                    logicalDeliveryCount = 0;
215                }
216                if (deltaToAdd > 0) {
217                    currentCreditRequest = prefetchExtension.addAndGet(deltaToAdd);
218                    subscription.wakeupDestinationsForDispatch();
219                    // force dispatch of matched/pending for topics (pending messages accumulate in the sub and are dispatched on update of prefetch)
220                    subscription.setPrefetchSize(0);
221                    LOG.trace("Flow: credit addition of {} for sub {}", deltaToAdd, subscription);
222                }
223            }
224        }
225    }
226
227    @Override
228    public void delivery(Delivery delivery) throws Exception {
229        MessageDispatch md = (MessageDispatch) delivery.getContext();
230        DeliveryState state = delivery.getRemoteState();
231
232        if (state instanceof TransactionalState) {
233            TransactionalState txState = (TransactionalState) state;
234            LOG.trace("onDelivery: TX delivery state = {}", state);
235            if (txState.getOutcome() != null) {
236                Outcome outcome = txState.getOutcome();
237                if (outcome instanceof Accepted) {
238                    if (!delivery.remotelySettled()) {
239                        TransactionalState txAccepted = new TransactionalState();
240                        txAccepted.setOutcome(Accepted.getInstance());
241                        txAccepted.setTxnId(((TransactionalState) state).getTxnId());
242
243                        delivery.disposition(txAccepted);
244                    }
245                    settle(delivery, MessageAck.DELIVERED_ACK_TYPE);
246                }
247            }
248        } else {
249            if (state instanceof Accepted) {
250                LOG.trace("onDelivery: accepted state = {}", state);
251                if (!delivery.remotelySettled()) {
252                    delivery.disposition(new Accepted());
253                }
254                settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
255            } else if (state instanceof Rejected) {
256                // Rejection is a terminal outcome, we poison the message for dispatch to
257                // the DLQ.  If a custom redelivery policy is used on the broker the message
258                // can still be redelivered based on the configation of that policy.
259                LOG.trace("onDelivery: Rejected state = {}, message poisoned.", state, md.getRedeliveryCounter());
260                settle(delivery, MessageAck.POSION_ACK_TYPE);
261            } else if (state instanceof Released) {
262                LOG.trace("onDelivery: Released state = {}", state);
263                // re-deliver && don't increment the counter.
264                settle(delivery, -1);
265            } else if (state instanceof Modified) {
266                Modified modified = (Modified) state;
267                if (Boolean.TRUE.equals(modified.getDeliveryFailed())) {
268                    // increment delivery counter..
269                    md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
270                }
271                LOG.trace("onDelivery: Modified state = {}, delivery count now {}", state, md.getRedeliveryCounter());
272                byte ackType = -1;
273                Boolean undeliverableHere = modified.getUndeliverableHere();
274                if (undeliverableHere != null && undeliverableHere) {
275                    // receiver does not want the message..
276                    // perhaps we should DLQ it?
277                    ackType = MessageAck.POSION_ACK_TYPE;
278                }
279                settle(delivery, ackType);
280            }
281        }
282
283        pumpOutbound();
284    }
285
286    @Override
287    public void commit() throws Exception {
288        if (!dispatchedInTx.isEmpty()) {
289            for (MessageDispatch md : dispatchedInTx) {
290                MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
291                pendingTxAck.setFirstMessageId(md.getMessage().getMessageId());
292                pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
293
294                LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
295
296                sendToActiveMQ(pendingTxAck, new ResponseHandler() {
297                    @Override
298                    public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
299                        if (response.isException()) {
300                            Throwable exception = ((ExceptionResponse) response).getException();
301                            exception.printStackTrace();
302                            getEndpoint().close();
303                        }
304                        session.pumpProtonToSocket();
305                    }
306                });
307            }
308
309            dispatchedInTx.clear();
310        }
311    }
312
313    @Override
314    public void rollback() throws Exception {
315        synchronized (outbound) {
316
317            LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size());
318
319            for (MessageDispatch dispatch : dispatchedInTx) {
320                dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1);
321                dispatch.getMessage().setTransactionId(null);
322                outbound.addFirst(dispatch);
323            }
324
325            dispatchedInTx.clear();
326        }
327    }
328
329    /**
330     * Event point for incoming message from ActiveMQ on this Sender's
331     * corresponding subscription.
332     *
333     * @param dispatch
334     *        the MessageDispatch to process and send across the link.
335     *
336     * @throws Exception if an error occurs while encoding the message for send.
337     */
338    public void onMessageDispatch(MessageDispatch dispatch) throws Exception {
339        if (!isClosed()) {
340            // Lock to prevent stepping on TX redelivery
341            synchronized (outbound) {
342                outbound.addLast(dispatch);
343            }
344            pumpOutbound();
345            session.pumpProtonToSocket();
346        }
347    }
348
349    /**
350     * Called when the Broker sends a ConsumerControl command to the Consumer that
351     * this sender creates to obtain messages to dispatch via the sender for this
352     * end of the open link.
353     *
354     * @param control
355     *        The ConsumerControl command to process.
356     */
357    public void onConsumerControl(ConsumerControl control) {
358        if (control.isClose()) {
359            close(new ErrorCondition(AmqpError.INTERNAL_ERROR, "Receiver forcably closed"));
360            session.pumpProtonToSocket();
361        }
362    }
363
364    @Override
365    public String toString() {
366        return "AmqpSender {" + getConsumerId() + "}";
367    }
368
369    //----- Property getters and setters -------------------------------------//
370
371    public ConsumerId getConsumerId() {
372        return consumerInfo.getConsumerId();
373    }
374
375    @Override
376    public ActiveMQDestination getDestination() {
377        return consumerInfo.getDestination();
378    }
379
380    @Override
381    public void setDestination(ActiveMQDestination destination) {
382        consumerInfo.setDestination(destination);
383    }
384
385    //----- Internal Implementation ------------------------------------------//
386
387    public void pumpOutbound() throws Exception {
388        while (!isClosed()) {
389            while (currentBuffer != null) {
390                int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
391                if (sent > 0) {
392                    currentBuffer.moveHead(sent);
393                    if (currentBuffer.length == 0) {
394                        if (presettle) {
395                            settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE);
396                        } else {
397                            getEndpoint().advance();
398                        }
399                        currentBuffer = null;
400                        currentDelivery = null;
401                        logicalDeliveryCount++;
402                    }
403                } else {
404                    return;
405                }
406            }
407
408            if (outbound.isEmpty()) {
409                return;
410            }
411
412            final MessageDispatch md = outbound.removeFirst();
413            try {
414
415                ActiveMQMessage temp = null;
416                if (md.getMessage() != null) {
417                    // topic sub conversion needs own copy to read and any conversion (read) can
418                    // compete with concurrentStoreAndDispatch marshall
419                    temp = (ActiveMQMessage) md.getMessage().copy();
420                    if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
421                        temp.setProperty(MESSAGE_FORMAT_KEY, 0);
422                    }
423                }
424
425                final ActiveMQMessage jms = temp;
426                if (jms == null) {
427                    LOG.trace("Sender:[{}] browse done.", getEndpoint().getName());
428                    // It's the end of browse signal in response to a MessagePull
429                    getEndpoint().drained();
430                    draining = false;
431                    currentCreditRequest = 0;
432                    logicalDeliveryCount = 0;
433                } else {
434                    if (LOG.isTraceEnabled()) {
435                        LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}",
436                                  getEndpoint().getName(), jms.getJMSMessageID(), draining, getEndpoint().getDrain(),
437                                  getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
438                    }
439
440                    if (draining && getEndpoint().getCredit() == 0) {
441                        LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName());
442                        getEndpoint().drained();
443                        draining = false;
444                        currentCreditRequest = 0;
445                        logicalDeliveryCount = 0;
446                    }
447
448                    jms.setRedeliveryCounter(md.getRedeliveryCounter());
449                    jms.setReadOnlyBody(true);
450                    final EncodedMessage amqp = outboundTransformer.transform(jms);
451                    if (amqp != null && amqp.getLength() > 0) {
452                        currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
453                        if (presettle) {
454                            currentDelivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
455                        } else {
456                            final byte[] tag = tagCache.getNextTag();
457                            currentDelivery = getEndpoint().delivery(tag, 0, tag.length);
458                        }
459                        currentDelivery.setContext(md);
460                    } else {
461                        // TODO: message could not be generated what now?
462                    }
463                }
464            } catch (Exception e) {
465                LOG.warn("Error detected while flushing outbound messages: {} {}", e, e.getMessage());
466                LOG.debug("Exception:", e);
467            }
468        }
469    }
470
471    private void settle(final Delivery delivery, final int ackType) throws Exception {
472        byte[] tag = delivery.getTag();
473        if (tag != null && tag.length > 0 && delivery.remotelySettled()) {
474            tagCache.returnTag(tag);
475        }
476
477        if (ackType == -1) {
478            // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
479            delivery.settle();
480            onMessageDispatch((MessageDispatch) delivery.getContext());
481        } else {
482            MessageDispatch md = (MessageDispatch) delivery.getContext();
483            lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
484            MessageAck ack = new MessageAck();
485            ack.setConsumerId(getConsumerId());
486            ack.setFirstMessageId(md.getMessage().getMessageId());
487            ack.setLastMessageId(md.getMessage().getMessageId());
488            ack.setMessageCount(1);
489            ack.setAckType((byte) ackType);
490            ack.setDestination(md.getDestination());
491
492            DeliveryState remoteState = delivery.getRemoteState();
493            if (remoteState != null && remoteState instanceof TransactionalState) {
494                TransactionalState txState = (TransactionalState) remoteState;
495                TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId()));
496                ack.setTransactionId(txId);
497
498                // Store the message sent in this TX we might need to re-send on rollback
499                session.enlist(txId);
500                md.getMessage().setTransactionId(txId);
501                dispatchedInTx.addFirst(md);
502            }
503
504            LOG.trace("Sending Ack to ActiveMQ: {}", ack);
505
506            sendToActiveMQ(ack, new ResponseHandler() {
507                @Override
508                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
509                    if (response.isException()) {
510                        if (response.isException()) {
511                            Throwable exception = ((ExceptionResponse) response).getException();
512                            exception.printStackTrace();
513                            getEndpoint().close();
514                        }
515                    } else {
516                        delivery.settle();
517                    }
518                    session.pumpProtonToSocket();
519                }
520            });
521        }
522    }
523}