001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
020
021import java.io.IOException;
022import java.util.LinkedList;
023import java.util.concurrent.atomic.AtomicInteger;
024
025import org.apache.activemq.broker.region.AbstractSubscription;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.ActiveMQMessage;
028import org.apache.activemq.command.ConsumerControl;
029import org.apache.activemq.command.ConsumerId;
030import org.apache.activemq.command.ConsumerInfo;
031import org.apache.activemq.command.ExceptionResponse;
032import org.apache.activemq.command.LocalTransactionId;
033import org.apache.activemq.command.MessageAck;
034import org.apache.activemq.command.MessageDispatch;
035import org.apache.activemq.command.MessagePull;
036import org.apache.activemq.command.RemoveInfo;
037import org.apache.activemq.command.RemoveSubscriptionInfo;
038import org.apache.activemq.command.Response;
039import org.apache.activemq.command.TransactionId;
040import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
041import org.apache.activemq.transport.amqp.ResponseHandler;
042import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
043import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
044import org.apache.activemq.transport.amqp.message.EncodedMessage;
045import org.apache.activemq.transport.amqp.message.OutboundTransformer;
046import org.apache.qpid.proton.amqp.messaging.Accepted;
047import org.apache.qpid.proton.amqp.messaging.Modified;
048import org.apache.qpid.proton.amqp.messaging.Outcome;
049import org.apache.qpid.proton.amqp.messaging.Rejected;
050import org.apache.qpid.proton.amqp.messaging.Released;
051import org.apache.qpid.proton.amqp.transaction.TransactionalState;
052import org.apache.qpid.proton.amqp.transport.AmqpError;
053import org.apache.qpid.proton.amqp.transport.DeliveryState;
054import org.apache.qpid.proton.amqp.transport.ErrorCondition;
055import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
056import org.apache.qpid.proton.engine.Delivery;
057import org.apache.qpid.proton.engine.Link;
058import org.apache.qpid.proton.engine.Sender;
059import org.fusesource.hawtbuf.Buffer;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * An AmqpSender wraps the AMQP Sender end of a link from the remote peer
065 * which holds the corresponding Receiver which receives messages transfered
066 * across the link from the Broker.
067 *
068 * An AmqpSender is in turn a message consumer subscribed to some destination
069 * on the broker.  As messages are dispatched to this sender that are sent on
070 * to the remote Receiver end of the lin.
071 */
072public class AmqpSender extends AmqpAbstractLink<Sender> {
073
074    private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class);
075
076    private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
077
078    private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
079    private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
080    private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
081    private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
082    private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
083
084    private final ConsumerInfo consumerInfo;
085    private AbstractSubscription subscription;
086    private AtomicInteger prefetchExtension;
087    private int currentCreditRequest;
088    private int logicalDeliveryCount; // echoes prefetch extension but from protons perspective
089    private final boolean presettle;
090
091    private boolean draining;
092    private long lastDeliveredSequenceId;
093
094    private Buffer currentBuffer;
095    private Delivery currentDelivery;
096
097    /**
098     * Creates a new AmqpSender instance that manages the given Sender
099     *
100     * @param session
101     *        the AmqpSession object that is the parent of this instance.
102     * @param endpoint
103     *        the AMQP Sender instance that this class manages.
104     * @param consumerInfo
105     *        the ConsumerInfo instance that holds configuration for this sender.
106     */
107    public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) {
108        super(session, endpoint);
109
110        this.consumerInfo = consumerInfo;
111        this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
112    }
113
114    @Override
115    public void open() {
116        if (!isClosed()) {
117            session.registerSender(getConsumerId(), this);
118            subscription = (AbstractSubscription)session.getConnection().lookupPrefetchSubscription(consumerInfo);
119            prefetchExtension = subscription.getPrefetchExtension();
120        }
121
122        super.open();
123    }
124
125    @Override
126    public void detach() {
127        if (!isClosed() && isOpened()) {
128            RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
129            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
130            sendToActiveMQ(removeCommand);
131
132            session.unregisterSender(getConsumerId());
133        }
134
135        super.detach();
136    }
137
138    @Override
139    public void close() {
140        if (!isClosed() && isOpened()) {
141            RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
142            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
143            sendToActiveMQ(removeCommand);
144
145            if (consumerInfo.isDurable()) {
146                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
147                rsi.setConnectionId(session.getConnection().getConnectionId());
148                rsi.setSubscriptionName(getEndpoint().getName());
149                rsi.setClientId(session.getConnection().getClientId());
150
151                sendToActiveMQ(rsi);
152            }
153
154            session.unregisterSender(getConsumerId());
155        }
156
157        super.close();
158    }
159
160    @Override
161    public void flow() throws Exception {
162        Link endpoint = getEndpoint();
163        if (LOG.isTraceEnabled()) {
164            LOG.trace("Flow: draining={}, drain={} credit={}, currentCredit={}, senderDeliveryCount={} - Sub={}",
165                    draining, endpoint.getDrain(),
166                    endpoint.getCredit(), currentCreditRequest, logicalDeliveryCount, subscription);
167        }
168
169        final int endpointCredit = endpoint.getCredit();
170        if (endpoint.getDrain() && !draining) {
171
172            if (endpointCredit > 0) {
173                draining = true;
174
175                // Now request dispatch of the drain amount, we request immediate
176                // timeout and an completion message regardless so that we can know
177                // when we should marked the link as drained.
178                MessagePull pullRequest = new MessagePull();
179                pullRequest.setConsumerId(getConsumerId());
180                pullRequest.setDestination(getDestination());
181                pullRequest.setTimeout(-1);
182                pullRequest.setAlwaysSignalDone(true);
183                pullRequest.setQuantity(endpointCredit);
184
185                LOG.trace("Pull case -> consumer pull request quantity = {}", endpointCredit);
186
187                sendToActiveMQ(pullRequest);
188            } else {
189                LOG.trace("Pull case -> sending any Queued messages and marking drained");
190
191                pumpOutbound();
192                getEndpoint().drained();
193                session.pumpProtonToSocket();
194                currentCreditRequest = 0;
195                logicalDeliveryCount = 0;
196            }
197        } else if (endpointCredit >= 0) {
198
199            if (endpointCredit == 0 && currentCreditRequest != 0) {
200
201                prefetchExtension.set(0);
202                currentCreditRequest = 0;
203                logicalDeliveryCount = 0;
204                LOG.trace("Flow: credit 0 for sub:" + subscription);
205
206            } else {
207
208                int deltaToAdd = endpointCredit;
209                int logicalCredit = currentCreditRequest - logicalDeliveryCount;
210                if (logicalCredit > 0) {
211                    deltaToAdd -= logicalCredit;
212                } else {
213                    // reset delivery counter - dispatch from broker concurrent with credit=0 flow can go negative
214                    logicalDeliveryCount = 0;
215                }
216                if (deltaToAdd > 0) {
217                    currentCreditRequest = prefetchExtension.addAndGet(deltaToAdd);
218                    subscription.wakeupDestinationsForDispatch();
219                    // force dispatch of matched/pending for topics (pending messages accumulate in the sub and are dispatched on update of prefetch)
220                    subscription.setPrefetchSize(0);
221                    LOG.trace("Flow: credit addition of {} for sub {}", deltaToAdd, subscription);
222                }
223            }
224        }
225    }
226
227    @Override
228    public void delivery(Delivery delivery) throws Exception {
229        MessageDispatch md = (MessageDispatch) delivery.getContext();
230        DeliveryState state = delivery.getRemoteState();
231
232        if (state instanceof TransactionalState) {
233            TransactionalState txState = (TransactionalState) state;
234            LOG.trace("onDelivery: TX delivery state = {}", state);
235            if (txState.getOutcome() != null) {
236                Outcome outcome = txState.getOutcome();
237                if (outcome instanceof Accepted) {
238                    if (!delivery.remotelySettled()) {
239                        TransactionalState txAccepted = new TransactionalState();
240                        txAccepted.setOutcome(Accepted.getInstance());
241                        txAccepted.setTxnId(((TransactionalState) state).getTxnId());
242
243                        delivery.disposition(txAccepted);
244                    }
245                    settle(delivery, MessageAck.DELIVERED_ACK_TYPE);
246                }
247            }
248        } else {
249            if (state instanceof Accepted) {
250                LOG.trace("onDelivery: accepted state = {}", state);
251                if (!delivery.remotelySettled()) {
252                    delivery.disposition(new Accepted());
253                }
254                settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
255            } else if (state instanceof Rejected) {
256                // re-deliver /w incremented delivery counter.
257                md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
258                LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter());
259                settle(delivery, -1);
260            } else if (state instanceof Released) {
261                LOG.trace("onDelivery: Released state = {}", state);
262                // re-deliver && don't increment the counter.
263                settle(delivery, -1);
264            } else if (state instanceof Modified) {
265                Modified modified = (Modified) state;
266                if (Boolean.TRUE.equals(modified.getDeliveryFailed())) {
267                    // increment delivery counter..
268                    md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
269                }
270                LOG.trace("onDelivery: Modified state = {}, delivery count now {}", state, md.getRedeliveryCounter());
271                byte ackType = -1;
272                Boolean undeliverableHere = modified.getUndeliverableHere();
273                if (undeliverableHere != null && undeliverableHere) {
274                    // receiver does not want the message..
275                    // perhaps we should DLQ it?
276                    ackType = MessageAck.POSION_ACK_TYPE;
277                }
278                settle(delivery, ackType);
279            }
280        }
281
282        pumpOutbound();
283    }
284
285    @Override
286    public void commit() throws Exception {
287        if (!dispatchedInTx.isEmpty()) {
288            for (MessageDispatch md : dispatchedInTx) {
289                MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
290                pendingTxAck.setFirstMessageId(md.getMessage().getMessageId());
291                pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
292
293                LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
294
295                sendToActiveMQ(pendingTxAck, new ResponseHandler() {
296                    @Override
297                    public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
298                        if (response.isException()) {
299                            Throwable exception = ((ExceptionResponse) response).getException();
300                            exception.printStackTrace();
301                            getEndpoint().close();
302                        }
303                        session.pumpProtonToSocket();
304                    }
305                });
306            }
307
308            dispatchedInTx.clear();
309        }
310    }
311
312    @Override
313    public void rollback() throws Exception {
314        synchronized (outbound) {
315
316            LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size());
317
318            for (MessageDispatch dispatch : dispatchedInTx) {
319                dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1);
320                dispatch.getMessage().setTransactionId(null);
321                outbound.addFirst(dispatch);
322            }
323
324            dispatchedInTx.clear();
325        }
326    }
327
328    /**
329     * Event point for incoming message from ActiveMQ on this Sender's
330     * corresponding subscription.
331     *
332     * @param dispatch
333     *        the MessageDispatch to process and send across the link.
334     *
335     * @throws Exception if an error occurs while encoding the message for send.
336     */
337    public void onMessageDispatch(MessageDispatch dispatch) throws Exception {
338        if (!isClosed()) {
339            // Lock to prevent stepping on TX redelivery
340            synchronized (outbound) {
341                outbound.addLast(dispatch);
342            }
343            pumpOutbound();
344            session.pumpProtonToSocket();
345        }
346    }
347
348    /**
349     * Called when the Broker sends a ConsumerControl command to the Consumer that
350     * this sender creates to obtain messages to dispatch via the sender for this
351     * end of the open link.
352     *
353     * @param control
354     *        The ConsumerControl command to process.
355     */
356    public void onConsumerControl(ConsumerControl control) {
357        if (control.isClose()) {
358            close(new ErrorCondition(AmqpError.INTERNAL_ERROR, "Receiver forcably closed"));
359            session.pumpProtonToSocket();
360        }
361    }
362
363    @Override
364    public String toString() {
365        return "AmqpSender {" + getConsumerId() + "}";
366    }
367
368    //----- Property getters and setters -------------------------------------//
369
370    public ConsumerId getConsumerId() {
371        return consumerInfo.getConsumerId();
372    }
373
374    @Override
375    public ActiveMQDestination getDestination() {
376        return consumerInfo.getDestination();
377    }
378
379    @Override
380    public void setDestination(ActiveMQDestination destination) {
381        consumerInfo.setDestination(destination);
382    }
383
384    //----- Internal Implementation ------------------------------------------//
385
386    public void pumpOutbound() throws Exception {
387        while (!isClosed()) {
388            while (currentBuffer != null) {
389                int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
390                if (sent > 0) {
391                    currentBuffer.moveHead(sent);
392                    if (currentBuffer.length == 0) {
393                        if (presettle) {
394                            settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE);
395                        } else {
396                            getEndpoint().advance();
397                        }
398                        currentBuffer = null;
399                        currentDelivery = null;
400                        logicalDeliveryCount++;
401                    }
402                } else {
403                    return;
404                }
405            }
406
407            if (outbound.isEmpty()) {
408                return;
409            }
410
411            final MessageDispatch md = outbound.removeFirst();
412            try {
413
414                ActiveMQMessage temp = null;
415                if (md.getMessage() != null) {
416
417                    // Topics can dispatch the same Message to more than one consumer
418                    // so we must copy to prevent concurrent read / write to the same
419                    // message object.
420                    if (md.getDestination().isTopic()) {
421                        synchronized (md.getMessage()) {
422                            temp = (ActiveMQMessage) md.getMessage().copy();
423                        }
424                    } else {
425                        temp = (ActiveMQMessage) md.getMessage();
426                    }
427
428                    if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
429                        temp.setProperty(MESSAGE_FORMAT_KEY, 0);
430                    }
431                }
432
433                final ActiveMQMessage jms = temp;
434                if (jms == null) {
435                    LOG.trace("Sender:[{}] browse done.", getEndpoint().getName());
436                    // It's the end of browse signal in response to a MessagePull
437                    getEndpoint().drained();
438                    draining = false;
439                    currentCreditRequest = 0;
440                    logicalDeliveryCount = 0;
441                } else {
442                    if (LOG.isTraceEnabled()) {
443                        LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}",
444                                  getEndpoint().getName(), jms.getJMSMessageID(), draining, getEndpoint().getDrain(),
445                                  getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
446                    }
447
448                    if (draining && getEndpoint().getCredit() == 0) {
449                        LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName());
450                        getEndpoint().drained();
451                        draining = false;
452                        currentCreditRequest = 0;
453                        logicalDeliveryCount = 0;
454                    }
455
456                    jms.setRedeliveryCounter(md.getRedeliveryCounter());
457                    jms.setReadOnlyBody(true);
458                    final EncodedMessage amqp = outboundTransformer.transform(jms);
459                    if (amqp != null && amqp.getLength() > 0) {
460                        currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
461                        if (presettle) {
462                            currentDelivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
463                        } else {
464                            final byte[] tag = tagCache.getNextTag();
465                            currentDelivery = getEndpoint().delivery(tag, 0, tag.length);
466                        }
467                        currentDelivery.setContext(md);
468                    } else {
469                        // TODO: message could not be generated what now?
470                    }
471                }
472            } catch (Exception e) {
473                LOG.warn("Error detected while flushing outbound messages: {}", e.getMessage());
474            }
475        }
476    }
477
478    private void settle(final Delivery delivery, final int ackType) throws Exception {
479        byte[] tag = delivery.getTag();
480        if (tag != null && tag.length > 0 && delivery.remotelySettled()) {
481            tagCache.returnTag(tag);
482        }
483
484        if (ackType == -1) {
485            // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
486            delivery.settle();
487            onMessageDispatch((MessageDispatch) delivery.getContext());
488        } else {
489            MessageDispatch md = (MessageDispatch) delivery.getContext();
490            lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
491            MessageAck ack = new MessageAck();
492            ack.setConsumerId(getConsumerId());
493            ack.setFirstMessageId(md.getMessage().getMessageId());
494            ack.setLastMessageId(md.getMessage().getMessageId());
495            ack.setMessageCount(1);
496            ack.setAckType((byte) ackType);
497            ack.setDestination(md.getDestination());
498
499            DeliveryState remoteState = delivery.getRemoteState();
500            if (remoteState != null && remoteState instanceof TransactionalState) {
501                TransactionalState txState = (TransactionalState) remoteState;
502                TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId()));
503                ack.setTransactionId(txId);
504
505                // Store the message sent in this TX we might need to re-send on rollback
506                session.enlist(txId);
507                md.getMessage().setTransactionId(txId);
508                dispatchedInTx.addFirst(md);
509            }
510
511            LOG.trace("Sending Ack to ActiveMQ: {}", ack);
512
513            sendToActiveMQ(ack, new ResponseHandler() {
514                @Override
515                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
516                    if (response.isException()) {
517                        if (response.isException()) {
518                            Throwable exception = ((ExceptionResponse) response).getException();
519                            exception.printStackTrace();
520                            getEndpoint().close();
521                        }
522                    } else {
523                        delivery.settle();
524                    }
525                    session.pumpProtonToSocket();
526                }
527            });
528        }
529    }
530}