001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.protocol; 018 019import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; 020 021import java.io.IOException; 022import java.util.LinkedList; 023import java.util.concurrent.atomic.AtomicInteger; 024 025import org.apache.activemq.broker.region.AbstractSubscription; 026import org.apache.activemq.command.ActiveMQDestination; 027import org.apache.activemq.command.ActiveMQMessage; 028import org.apache.activemq.command.ConsumerControl; 029import org.apache.activemq.command.ConsumerId; 030import org.apache.activemq.command.ConsumerInfo; 031import org.apache.activemq.command.ExceptionResponse; 032import org.apache.activemq.command.LocalTransactionId; 033import org.apache.activemq.command.MessageAck; 034import org.apache.activemq.command.MessageDispatch; 035import org.apache.activemq.command.MessagePull; 036import org.apache.activemq.command.RemoveInfo; 037import org.apache.activemq.command.RemoveSubscriptionInfo; 038import org.apache.activemq.command.Response; 039import org.apache.activemq.command.TransactionId; 040import org.apache.activemq.transport.amqp.AmqpProtocolConverter; 041import org.apache.activemq.transport.amqp.ResponseHandler; 042import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor; 043import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer; 044import org.apache.activemq.transport.amqp.message.EncodedMessage; 045import org.apache.activemq.transport.amqp.message.OutboundTransformer; 046import org.apache.qpid.proton.amqp.messaging.Accepted; 047import org.apache.qpid.proton.amqp.messaging.Modified; 048import org.apache.qpid.proton.amqp.messaging.Outcome; 049import org.apache.qpid.proton.amqp.messaging.Rejected; 050import org.apache.qpid.proton.amqp.messaging.Released; 051import org.apache.qpid.proton.amqp.transaction.TransactionalState; 052import org.apache.qpid.proton.amqp.transport.AmqpError; 053import org.apache.qpid.proton.amqp.transport.DeliveryState; 054import org.apache.qpid.proton.amqp.transport.ErrorCondition; 055import org.apache.qpid.proton.amqp.transport.SenderSettleMode; 056import org.apache.qpid.proton.engine.Delivery; 057import org.apache.qpid.proton.engine.Link; 058import org.apache.qpid.proton.engine.Sender; 059import org.fusesource.hawtbuf.Buffer; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * An AmqpSender wraps the AMQP Sender end of a link from the remote peer 065 * which holds the corresponding Receiver which receives messages transfered 066 * across the link from the Broker. 067 * 068 * An AmqpSender is in turn a message consumer subscribed to some destination 069 * on the broker. As messages are dispatched to this sender that are sent on 070 * to the remote Receiver end of the lin. 071 */ 072public class AmqpSender extends AmqpAbstractLink<Sender> { 073 074 private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class); 075 076 private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; 077 078 private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE); 079 private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator(); 080 private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>(); 081 private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>(); 082 private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT"; 083 084 private final ConsumerInfo consumerInfo; 085 private AbstractSubscription subscription; 086 private AtomicInteger prefetchExtension; 087 private int currentCreditRequest; 088 private int logicalDeliveryCount; // echoes prefetch extension but from protons perspective 089 private final boolean presettle; 090 091 private boolean draining; 092 private long lastDeliveredSequenceId; 093 094 private Buffer currentBuffer; 095 private Delivery currentDelivery; 096 097 /** 098 * Creates a new AmqpSender instance that manages the given Sender 099 * 100 * @param session 101 * the AmqpSession object that is the parent of this instance. 102 * @param endpoint 103 * the AMQP Sender instance that this class manages. 104 * @param consumerInfo 105 * the ConsumerInfo instance that holds configuration for this sender. 106 */ 107 public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) { 108 super(session, endpoint); 109 110 this.consumerInfo = consumerInfo; 111 this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; 112 } 113 114 @Override 115 public void open() { 116 if (!isClosed()) { 117 session.registerSender(getConsumerId(), this); 118 subscription = (AbstractSubscription)session.getConnection().lookupPrefetchSubscription(consumerInfo); 119 prefetchExtension = subscription.getPrefetchExtension(); 120 } 121 122 super.open(); 123 } 124 125 @Override 126 public void detach() { 127 if (!isClosed() && isOpened()) { 128 RemoveInfo removeCommand = new RemoveInfo(getConsumerId()); 129 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 130 sendToActiveMQ(removeCommand); 131 132 session.unregisterSender(getConsumerId()); 133 } 134 135 super.detach(); 136 } 137 138 @Override 139 public void close() { 140 if (!isClosed() && isOpened()) { 141 RemoveInfo removeCommand = new RemoveInfo(getConsumerId()); 142 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 143 sendToActiveMQ(removeCommand); 144 145 if (consumerInfo.isDurable()) { 146 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 147 rsi.setConnectionId(session.getConnection().getConnectionId()); 148 rsi.setSubscriptionName(getEndpoint().getName()); 149 rsi.setClientId(session.getConnection().getClientId()); 150 151 sendToActiveMQ(rsi); 152 } 153 154 session.unregisterSender(getConsumerId()); 155 } 156 157 super.close(); 158 } 159 160 @Override 161 public void flow() throws Exception { 162 Link endpoint = getEndpoint(); 163 if (LOG.isTraceEnabled()) { 164 LOG.trace("Flow: draining={}, drain={} credit={}, currentCredit={}, senderDeliveryCount={} - Sub={}", 165 draining, endpoint.getDrain(), 166 endpoint.getCredit(), currentCreditRequest, logicalDeliveryCount, subscription); 167 } 168 169 final int endpointCredit = endpoint.getCredit(); 170 if (endpoint.getDrain() && !draining) { 171 172 if (endpointCredit > 0) { 173 draining = true; 174 175 // Now request dispatch of the drain amount, we request immediate 176 // timeout and an completion message regardless so that we can know 177 // when we should marked the link as drained. 178 MessagePull pullRequest = new MessagePull(); 179 pullRequest.setConsumerId(getConsumerId()); 180 pullRequest.setDestination(getDestination()); 181 pullRequest.setTimeout(-1); 182 pullRequest.setAlwaysSignalDone(true); 183 pullRequest.setQuantity(endpointCredit); 184 185 LOG.trace("Pull case -> consumer pull request quantity = {}", endpointCredit); 186 187 sendToActiveMQ(pullRequest); 188 } else { 189 LOG.trace("Pull case -> sending any Queued messages and marking drained"); 190 191 pumpOutbound(); 192 getEndpoint().drained(); 193 session.pumpProtonToSocket(); 194 currentCreditRequest = 0; 195 logicalDeliveryCount = 0; 196 } 197 } else if (endpointCredit >= 0) { 198 199 if (endpointCredit == 0 && currentCreditRequest != 0) { 200 201 prefetchExtension.set(0); 202 currentCreditRequest = 0; 203 logicalDeliveryCount = 0; 204 LOG.trace("Flow: credit 0 for sub:" + subscription); 205 206 } else { 207 208 int deltaToAdd = endpointCredit; 209 int logicalCredit = currentCreditRequest - logicalDeliveryCount; 210 if (logicalCredit > 0) { 211 deltaToAdd -= logicalCredit; 212 } else { 213 // reset delivery counter - dispatch from broker concurrent with credit=0 flow can go negative 214 logicalDeliveryCount = 0; 215 } 216 if (deltaToAdd > 0) { 217 currentCreditRequest = prefetchExtension.addAndGet(deltaToAdd); 218 subscription.wakeupDestinationsForDispatch(); 219 // force dispatch of matched/pending for topics (pending messages accumulate in the sub and are dispatched on update of prefetch) 220 subscription.setPrefetchSize(0); 221 LOG.trace("Flow: credit addition of {} for sub {}", deltaToAdd, subscription); 222 } 223 } 224 } 225 } 226 227 @Override 228 public void delivery(Delivery delivery) throws Exception { 229 MessageDispatch md = (MessageDispatch) delivery.getContext(); 230 DeliveryState state = delivery.getRemoteState(); 231 232 if (state instanceof TransactionalState) { 233 TransactionalState txState = (TransactionalState) state; 234 LOG.trace("onDelivery: TX delivery state = {}", state); 235 if (txState.getOutcome() != null) { 236 Outcome outcome = txState.getOutcome(); 237 if (outcome instanceof Accepted) { 238 if (!delivery.remotelySettled()) { 239 TransactionalState txAccepted = new TransactionalState(); 240 txAccepted.setOutcome(Accepted.getInstance()); 241 txAccepted.setTxnId(((TransactionalState) state).getTxnId()); 242 243 delivery.disposition(txAccepted); 244 } 245 settle(delivery, MessageAck.DELIVERED_ACK_TYPE); 246 } 247 } 248 } else { 249 if (state instanceof Accepted) { 250 LOG.trace("onDelivery: accepted state = {}", state); 251 if (!delivery.remotelySettled()) { 252 delivery.disposition(new Accepted()); 253 } 254 settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE); 255 } else if (state instanceof Rejected) { 256 // Rejection is a terminal outcome, we poison the message for dispatch to 257 // the DLQ. If a custom redelivery policy is used on the broker the message 258 // can still be redelivered based on the configation of that policy. 259 LOG.trace("onDelivery: Rejected state = {}, message poisoned.", state, md.getRedeliveryCounter()); 260 settle(delivery, MessageAck.POSION_ACK_TYPE); 261 } else if (state instanceof Released) { 262 LOG.trace("onDelivery: Released state = {}", state); 263 // re-deliver && don't increment the counter. 264 settle(delivery, -1); 265 } else if (state instanceof Modified) { 266 Modified modified = (Modified) state; 267 if (Boolean.TRUE.equals(modified.getDeliveryFailed())) { 268 // increment delivery counter.. 269 md.setRedeliveryCounter(md.getRedeliveryCounter() + 1); 270 } 271 LOG.trace("onDelivery: Modified state = {}, delivery count now {}", state, md.getRedeliveryCounter()); 272 byte ackType = -1; 273 Boolean undeliverableHere = modified.getUndeliverableHere(); 274 if (undeliverableHere != null && undeliverableHere) { 275 // receiver does not want the message.. 276 // perhaps we should DLQ it? 277 ackType = MessageAck.POSION_ACK_TYPE; 278 } 279 settle(delivery, ackType); 280 } 281 } 282 283 pumpOutbound(); 284 } 285 286 @Override 287 public void commit() throws Exception { 288 if (!dispatchedInTx.isEmpty()) { 289 for (MessageDispatch md : dispatchedInTx) { 290 MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1); 291 pendingTxAck.setFirstMessageId(md.getMessage().getMessageId()); 292 pendingTxAck.setTransactionId(md.getMessage().getTransactionId()); 293 294 LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck); 295 296 sendToActiveMQ(pendingTxAck, new ResponseHandler() { 297 @Override 298 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 299 if (response.isException()) { 300 Throwable exception = ((ExceptionResponse) response).getException(); 301 exception.printStackTrace(); 302 getEndpoint().close(); 303 } 304 session.pumpProtonToSocket(); 305 } 306 }); 307 } 308 309 dispatchedInTx.clear(); 310 } 311 } 312 313 @Override 314 public void rollback() throws Exception { 315 synchronized (outbound) { 316 317 LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size()); 318 319 for (MessageDispatch dispatch : dispatchedInTx) { 320 dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1); 321 dispatch.getMessage().setTransactionId(null); 322 outbound.addFirst(dispatch); 323 } 324 325 dispatchedInTx.clear(); 326 } 327 } 328 329 /** 330 * Event point for incoming message from ActiveMQ on this Sender's 331 * corresponding subscription. 332 * 333 * @param dispatch 334 * the MessageDispatch to process and send across the link. 335 * 336 * @throws Exception if an error occurs while encoding the message for send. 337 */ 338 public void onMessageDispatch(MessageDispatch dispatch) throws Exception { 339 if (!isClosed()) { 340 // Lock to prevent stepping on TX redelivery 341 synchronized (outbound) { 342 outbound.addLast(dispatch); 343 } 344 pumpOutbound(); 345 session.pumpProtonToSocket(); 346 } 347 } 348 349 /** 350 * Called when the Broker sends a ConsumerControl command to the Consumer that 351 * this sender creates to obtain messages to dispatch via the sender for this 352 * end of the open link. 353 * 354 * @param control 355 * The ConsumerControl command to process. 356 */ 357 public void onConsumerControl(ConsumerControl control) { 358 if (control.isClose()) { 359 close(new ErrorCondition(AmqpError.INTERNAL_ERROR, "Receiver forcably closed")); 360 session.pumpProtonToSocket(); 361 } 362 } 363 364 @Override 365 public String toString() { 366 return "AmqpSender {" + getConsumerId() + "}"; 367 } 368 369 //----- Property getters and setters -------------------------------------// 370 371 public ConsumerId getConsumerId() { 372 return consumerInfo.getConsumerId(); 373 } 374 375 @Override 376 public ActiveMQDestination getDestination() { 377 return consumerInfo.getDestination(); 378 } 379 380 @Override 381 public void setDestination(ActiveMQDestination destination) { 382 consumerInfo.setDestination(destination); 383 } 384 385 //----- Internal Implementation ------------------------------------------// 386 387 public void pumpOutbound() throws Exception { 388 while (!isClosed()) { 389 while (currentBuffer != null) { 390 int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length); 391 if (sent > 0) { 392 currentBuffer.moveHead(sent); 393 if (currentBuffer.length == 0) { 394 if (presettle) { 395 settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE); 396 } else { 397 getEndpoint().advance(); 398 } 399 currentBuffer = null; 400 currentDelivery = null; 401 logicalDeliveryCount++; 402 } 403 } else { 404 return; 405 } 406 } 407 408 if (outbound.isEmpty()) { 409 return; 410 } 411 412 final MessageDispatch md = outbound.removeFirst(); 413 try { 414 415 ActiveMQMessage temp = null; 416 if (md.getMessage() != null) { 417 // topic sub conversion needs own copy to read and any conversion (read) can 418 // compete with concurrentStoreAndDispatch marshall 419 temp = (ActiveMQMessage) md.getMessage().copy(); 420 if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) { 421 temp.setProperty(MESSAGE_FORMAT_KEY, 0); 422 } 423 } 424 425 final ActiveMQMessage jms = temp; 426 if (jms == null) { 427 LOG.trace("Sender:[{}] browse done.", getEndpoint().getName()); 428 // It's the end of browse signal in response to a MessagePull 429 getEndpoint().drained(); 430 draining = false; 431 currentCreditRequest = 0; 432 logicalDeliveryCount = 0; 433 } else { 434 if (LOG.isTraceEnabled()) { 435 LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}", 436 getEndpoint().getName(), jms.getJMSMessageID(), draining, getEndpoint().getDrain(), 437 getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued()); 438 } 439 440 if (draining && getEndpoint().getCredit() == 0) { 441 LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName()); 442 getEndpoint().drained(); 443 draining = false; 444 currentCreditRequest = 0; 445 logicalDeliveryCount = 0; 446 } 447 448 jms.setRedeliveryCounter(md.getRedeliveryCounter()); 449 jms.setReadOnlyBody(true); 450 final EncodedMessage amqp = outboundTransformer.transform(jms); 451 if (amqp != null && amqp.getLength() > 0) { 452 currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength()); 453 if (presettle) { 454 currentDelivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0); 455 } else { 456 final byte[] tag = tagCache.getNextTag(); 457 currentDelivery = getEndpoint().delivery(tag, 0, tag.length); 458 } 459 currentDelivery.setContext(md); 460 } else { 461 // TODO: message could not be generated what now? 462 } 463 } 464 } catch (Exception e) { 465 LOG.warn("Error detected while flushing outbound messages: {} {}", e, e.getMessage()); 466 LOG.debug("Exception:", e); 467 } 468 } 469 } 470 471 private void settle(final Delivery delivery, final int ackType) throws Exception { 472 byte[] tag = delivery.getTag(); 473 if (tag != null && tag.length > 0 && delivery.remotelySettled()) { 474 tagCache.returnTag(tag); 475 } 476 477 if (ackType == -1) { 478 // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ 479 delivery.settle(); 480 onMessageDispatch((MessageDispatch) delivery.getContext()); 481 } else { 482 MessageDispatch md = (MessageDispatch) delivery.getContext(); 483 lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); 484 MessageAck ack = new MessageAck(); 485 ack.setConsumerId(getConsumerId()); 486 ack.setFirstMessageId(md.getMessage().getMessageId()); 487 ack.setLastMessageId(md.getMessage().getMessageId()); 488 ack.setMessageCount(1); 489 ack.setAckType((byte) ackType); 490 ack.setDestination(md.getDestination()); 491 492 DeliveryState remoteState = delivery.getRemoteState(); 493 if (remoteState != null && remoteState instanceof TransactionalState) { 494 TransactionalState txState = (TransactionalState) remoteState; 495 TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId())); 496 ack.setTransactionId(txId); 497 498 // Store the message sent in this TX we might need to re-send on rollback 499 session.enlist(txId); 500 md.getMessage().setTransactionId(txId); 501 dispatchedInTx.addFirst(md); 502 } 503 504 LOG.trace("Sending Ack to ActiveMQ: {}", ack); 505 506 sendToActiveMQ(ack, new ResponseHandler() { 507 @Override 508 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 509 if (response.isException()) { 510 if (response.isException()) { 511 Throwable exception = ((ExceptionResponse) response).getException(); 512 exception.printStackTrace(); 513 getEndpoint().close(); 514 } 515 } else { 516 delivery.settle(); 517 } 518 session.pumpProtonToSocket(); 519 } 520 }); 521 } 522 } 523}