001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.protocol; 018 019import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; 020 021import java.io.IOException; 022import java.util.LinkedList; 023import java.util.concurrent.atomic.AtomicInteger; 024 025import org.apache.activemq.broker.region.AbstractSubscription; 026import org.apache.activemq.command.ActiveMQDestination; 027import org.apache.activemq.command.ActiveMQMessage; 028import org.apache.activemq.command.ConsumerControl; 029import org.apache.activemq.command.ConsumerId; 030import org.apache.activemq.command.ConsumerInfo; 031import org.apache.activemq.command.ExceptionResponse; 032import org.apache.activemq.command.LocalTransactionId; 033import org.apache.activemq.command.MessageAck; 034import org.apache.activemq.command.MessageDispatch; 035import org.apache.activemq.command.MessagePull; 036import org.apache.activemq.command.RemoveInfo; 037import org.apache.activemq.command.RemoveSubscriptionInfo; 038import org.apache.activemq.command.Response; 039import org.apache.activemq.command.TransactionId; 040import org.apache.activemq.transport.amqp.AmqpProtocolConverter; 041import org.apache.activemq.transport.amqp.ResponseHandler; 042import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor; 043import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer; 044import org.apache.activemq.transport.amqp.message.EncodedMessage; 045import org.apache.activemq.transport.amqp.message.OutboundTransformer; 046import org.apache.qpid.proton.amqp.messaging.Accepted; 047import org.apache.qpid.proton.amqp.messaging.Modified; 048import org.apache.qpid.proton.amqp.messaging.Outcome; 049import org.apache.qpid.proton.amqp.messaging.Rejected; 050import org.apache.qpid.proton.amqp.messaging.Released; 051import org.apache.qpid.proton.amqp.transaction.TransactionalState; 052import org.apache.qpid.proton.amqp.transport.AmqpError; 053import org.apache.qpid.proton.amqp.transport.DeliveryState; 054import org.apache.qpid.proton.amqp.transport.ErrorCondition; 055import org.apache.qpid.proton.amqp.transport.SenderSettleMode; 056import org.apache.qpid.proton.engine.Delivery; 057import org.apache.qpid.proton.engine.Link; 058import org.apache.qpid.proton.engine.Sender; 059import org.fusesource.hawtbuf.Buffer; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * An AmqpSender wraps the AMQP Sender end of a link from the remote peer 065 * which holds the corresponding Receiver which receives messages transfered 066 * across the link from the Broker. 067 * 068 * An AmqpSender is in turn a message consumer subscribed to some destination 069 * on the broker. As messages are dispatched to this sender that are sent on 070 * to the remote Receiver end of the lin. 071 */ 072public class AmqpSender extends AmqpAbstractLink<Sender> { 073 074 private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class); 075 076 private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; 077 078 private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE); 079 private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator(); 080 private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>(); 081 private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>(); 082 private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT"; 083 084 private final ConsumerInfo consumerInfo; 085 private AbstractSubscription subscription; 086 private AtomicInteger prefetchExtension; 087 private int currentCreditRequest; 088 private int logicalDeliveryCount; // echoes prefetch extension but from protons perspective 089 private final boolean presettle; 090 091 private boolean draining; 092 private long lastDeliveredSequenceId; 093 094 private Buffer currentBuffer; 095 private Delivery currentDelivery; 096 097 /** 098 * Creates a new AmqpSender instance that manages the given Sender 099 * 100 * @param session 101 * the AmqpSession object that is the parent of this instance. 102 * @param endpoint 103 * the AMQP Sender instance that this class manages. 104 * @param consumerInfo 105 * the ConsumerInfo instance that holds configuration for this sender. 106 */ 107 public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) { 108 super(session, endpoint); 109 110 this.consumerInfo = consumerInfo; 111 this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; 112 } 113 114 @Override 115 public void open() { 116 if (!isClosed()) { 117 session.registerSender(getConsumerId(), this); 118 subscription = (AbstractSubscription)session.getConnection().lookupPrefetchSubscription(consumerInfo); 119 prefetchExtension = subscription.getPrefetchExtension(); 120 } 121 122 super.open(); 123 } 124 125 @Override 126 public void detach() { 127 if (!isClosed() && isOpened()) { 128 RemoveInfo removeCommand = new RemoveInfo(getConsumerId()); 129 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 130 sendToActiveMQ(removeCommand); 131 132 session.unregisterSender(getConsumerId()); 133 } 134 135 super.detach(); 136 } 137 138 @Override 139 public void close() { 140 if (!isClosed() && isOpened()) { 141 RemoveInfo removeCommand = new RemoveInfo(getConsumerId()); 142 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 143 sendToActiveMQ(removeCommand); 144 145 if (consumerInfo.isDurable()) { 146 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 147 rsi.setConnectionId(session.getConnection().getConnectionId()); 148 rsi.setSubscriptionName(getEndpoint().getName()); 149 rsi.setClientId(session.getConnection().getClientId()); 150 151 sendToActiveMQ(rsi); 152 } 153 154 session.unregisterSender(getConsumerId()); 155 } 156 157 super.close(); 158 } 159 160 @Override 161 public void flow() throws Exception { 162 Link endpoint = getEndpoint(); 163 if (LOG.isTraceEnabled()) { 164 LOG.trace("Flow: draining={}, drain={} credit={}, currentCredit={}, senderDeliveryCount={} - Sub={}", 165 draining, endpoint.getDrain(), 166 endpoint.getCredit(), currentCreditRequest, logicalDeliveryCount, subscription); 167 } 168 169 final int endpointCredit = endpoint.getCredit(); 170 if (endpoint.getDrain() && !draining) { 171 172 if (endpointCredit > 0) { 173 draining = true; 174 175 // Now request dispatch of the drain amount, we request immediate 176 // timeout and an completion message regardless so that we can know 177 // when we should marked the link as drained. 178 MessagePull pullRequest = new MessagePull(); 179 pullRequest.setConsumerId(getConsumerId()); 180 pullRequest.setDestination(getDestination()); 181 pullRequest.setTimeout(-1); 182 pullRequest.setAlwaysSignalDone(true); 183 pullRequest.setQuantity(endpointCredit); 184 185 LOG.trace("Pull case -> consumer pull request quantity = {}", endpointCredit); 186 187 sendToActiveMQ(pullRequest); 188 } else { 189 LOG.trace("Pull case -> sending any Queued messages and marking drained"); 190 191 pumpOutbound(); 192 getEndpoint().drained(); 193 session.pumpProtonToSocket(); 194 currentCreditRequest = 0; 195 logicalDeliveryCount = 0; 196 } 197 } else if (endpointCredit >= 0) { 198 199 if (endpointCredit == 0 && currentCreditRequest != 0) { 200 201 prefetchExtension.set(0); 202 currentCreditRequest = 0; 203 logicalDeliveryCount = 0; 204 LOG.trace("Flow: credit 0 for sub:" + subscription); 205 206 } else { 207 208 int deltaToAdd = endpointCredit; 209 int logicalCredit = currentCreditRequest - logicalDeliveryCount; 210 if (logicalCredit > 0) { 211 deltaToAdd -= logicalCredit; 212 } else { 213 // reset delivery counter - dispatch from broker concurrent with credit=0 flow can go negative 214 logicalDeliveryCount = 0; 215 } 216 if (deltaToAdd > 0) { 217 currentCreditRequest = prefetchExtension.addAndGet(deltaToAdd); 218 subscription.wakeupDestinationsForDispatch(); 219 // force dispatch of matched/pending for topics (pending messages accumulate in the sub and are dispatched on update of prefetch) 220 subscription.setPrefetchSize(0); 221 LOG.trace("Flow: credit addition of {} for sub {}", deltaToAdd, subscription); 222 } 223 } 224 } 225 } 226 227 @Override 228 public void delivery(Delivery delivery) throws Exception { 229 MessageDispatch md = (MessageDispatch) delivery.getContext(); 230 DeliveryState state = delivery.getRemoteState(); 231 232 if (state instanceof TransactionalState) { 233 TransactionalState txState = (TransactionalState) state; 234 LOG.trace("onDelivery: TX delivery state = {}", state); 235 if (txState.getOutcome() != null) { 236 Outcome outcome = txState.getOutcome(); 237 if (outcome instanceof Accepted) { 238 if (!delivery.remotelySettled()) { 239 TransactionalState txAccepted = new TransactionalState(); 240 txAccepted.setOutcome(Accepted.getInstance()); 241 txAccepted.setTxnId(((TransactionalState) state).getTxnId()); 242 243 delivery.disposition(txAccepted); 244 } 245 settle(delivery, MessageAck.DELIVERED_ACK_TYPE); 246 } 247 } 248 } else { 249 if (state instanceof Accepted) { 250 LOG.trace("onDelivery: accepted state = {}", state); 251 if (!delivery.remotelySettled()) { 252 delivery.disposition(new Accepted()); 253 } 254 settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE); 255 } else if (state instanceof Rejected) { 256 // re-deliver /w incremented delivery counter. 257 md.setRedeliveryCounter(md.getRedeliveryCounter() + 1); 258 LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter()); 259 settle(delivery, -1); 260 } else if (state instanceof Released) { 261 LOG.trace("onDelivery: Released state = {}", state); 262 // re-deliver && don't increment the counter. 263 settle(delivery, -1); 264 } else if (state instanceof Modified) { 265 Modified modified = (Modified) state; 266 if (Boolean.TRUE.equals(modified.getDeliveryFailed())) { 267 // increment delivery counter.. 268 md.setRedeliveryCounter(md.getRedeliveryCounter() + 1); 269 } 270 LOG.trace("onDelivery: Modified state = {}, delivery count now {}", state, md.getRedeliveryCounter()); 271 byte ackType = -1; 272 Boolean undeliverableHere = modified.getUndeliverableHere(); 273 if (undeliverableHere != null && undeliverableHere) { 274 // receiver does not want the message.. 275 // perhaps we should DLQ it? 276 ackType = MessageAck.POSION_ACK_TYPE; 277 } 278 settle(delivery, ackType); 279 } 280 } 281 282 pumpOutbound(); 283 } 284 285 @Override 286 public void commit() throws Exception { 287 if (!dispatchedInTx.isEmpty()) { 288 for (MessageDispatch md : dispatchedInTx) { 289 MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1); 290 pendingTxAck.setFirstMessageId(md.getMessage().getMessageId()); 291 pendingTxAck.setTransactionId(md.getMessage().getTransactionId()); 292 293 LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck); 294 295 sendToActiveMQ(pendingTxAck, new ResponseHandler() { 296 @Override 297 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 298 if (response.isException()) { 299 Throwable exception = ((ExceptionResponse) response).getException(); 300 exception.printStackTrace(); 301 getEndpoint().close(); 302 } 303 session.pumpProtonToSocket(); 304 } 305 }); 306 } 307 308 dispatchedInTx.clear(); 309 } 310 } 311 312 @Override 313 public void rollback() throws Exception { 314 synchronized (outbound) { 315 316 LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size()); 317 318 for (MessageDispatch dispatch : dispatchedInTx) { 319 dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1); 320 dispatch.getMessage().setTransactionId(null); 321 outbound.addFirst(dispatch); 322 } 323 324 dispatchedInTx.clear(); 325 } 326 } 327 328 /** 329 * Event point for incoming message from ActiveMQ on this Sender's 330 * corresponding subscription. 331 * 332 * @param dispatch 333 * the MessageDispatch to process and send across the link. 334 * 335 * @throws Exception if an error occurs while encoding the message for send. 336 */ 337 public void onMessageDispatch(MessageDispatch dispatch) throws Exception { 338 if (!isClosed()) { 339 // Lock to prevent stepping on TX redelivery 340 synchronized (outbound) { 341 outbound.addLast(dispatch); 342 } 343 pumpOutbound(); 344 session.pumpProtonToSocket(); 345 } 346 } 347 348 /** 349 * Called when the Broker sends a ConsumerControl command to the Consumer that 350 * this sender creates to obtain messages to dispatch via the sender for this 351 * end of the open link. 352 * 353 * @param control 354 * The ConsumerControl command to process. 355 */ 356 public void onConsumerControl(ConsumerControl control) { 357 if (control.isClose()) { 358 close(new ErrorCondition(AmqpError.INTERNAL_ERROR, "Receiver forcably closed")); 359 session.pumpProtonToSocket(); 360 } 361 } 362 363 @Override 364 public String toString() { 365 return "AmqpSender {" + getConsumerId() + "}"; 366 } 367 368 //----- Property getters and setters -------------------------------------// 369 370 public ConsumerId getConsumerId() { 371 return consumerInfo.getConsumerId(); 372 } 373 374 @Override 375 public ActiveMQDestination getDestination() { 376 return consumerInfo.getDestination(); 377 } 378 379 @Override 380 public void setDestination(ActiveMQDestination destination) { 381 consumerInfo.setDestination(destination); 382 } 383 384 //----- Internal Implementation ------------------------------------------// 385 386 public void pumpOutbound() throws Exception { 387 while (!isClosed()) { 388 while (currentBuffer != null) { 389 int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length); 390 if (sent > 0) { 391 currentBuffer.moveHead(sent); 392 if (currentBuffer.length == 0) { 393 if (presettle) { 394 settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE); 395 } else { 396 getEndpoint().advance(); 397 } 398 currentBuffer = null; 399 currentDelivery = null; 400 logicalDeliveryCount++; 401 } 402 } else { 403 return; 404 } 405 } 406 407 if (outbound.isEmpty()) { 408 return; 409 } 410 411 final MessageDispatch md = outbound.removeFirst(); 412 try { 413 414 ActiveMQMessage temp = null; 415 if (md.getMessage() != null) { 416 417 // Topics can dispatch the same Message to more than one consumer 418 // so we must copy to prevent concurrent read / write to the same 419 // message object. 420 if (md.getDestination().isTopic()) { 421 synchronized (md.getMessage()) { 422 temp = (ActiveMQMessage) md.getMessage().copy(); 423 } 424 } else { 425 temp = (ActiveMQMessage) md.getMessage(); 426 } 427 428 if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) { 429 temp.setProperty(MESSAGE_FORMAT_KEY, 0); 430 } 431 } 432 433 final ActiveMQMessage jms = temp; 434 if (jms == null) { 435 LOG.trace("Sender:[{}] browse done.", getEndpoint().getName()); 436 // It's the end of browse signal in response to a MessagePull 437 getEndpoint().drained(); 438 draining = false; 439 currentCreditRequest = 0; 440 logicalDeliveryCount = 0; 441 } else { 442 if (LOG.isTraceEnabled()) { 443 LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}", 444 getEndpoint().getName(), jms.getJMSMessageID(), draining, getEndpoint().getDrain(), 445 getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued()); 446 } 447 448 if (draining && getEndpoint().getCredit() == 0) { 449 LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName()); 450 getEndpoint().drained(); 451 draining = false; 452 currentCreditRequest = 0; 453 logicalDeliveryCount = 0; 454 } 455 456 jms.setRedeliveryCounter(md.getRedeliveryCounter()); 457 jms.setReadOnlyBody(true); 458 final EncodedMessage amqp = outboundTransformer.transform(jms); 459 if (amqp != null && amqp.getLength() > 0) { 460 currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength()); 461 if (presettle) { 462 currentDelivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0); 463 } else { 464 final byte[] tag = tagCache.getNextTag(); 465 currentDelivery = getEndpoint().delivery(tag, 0, tag.length); 466 } 467 currentDelivery.setContext(md); 468 } else { 469 // TODO: message could not be generated what now? 470 } 471 } 472 } catch (Exception e) { 473 LOG.warn("Error detected while flushing outbound messages: {}", e.getMessage()); 474 } 475 } 476 } 477 478 private void settle(final Delivery delivery, final int ackType) throws Exception { 479 byte[] tag = delivery.getTag(); 480 if (tag != null && tag.length > 0 && delivery.remotelySettled()) { 481 tagCache.returnTag(tag); 482 } 483 484 if (ackType == -1) { 485 // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ 486 delivery.settle(); 487 onMessageDispatch((MessageDispatch) delivery.getContext()); 488 } else { 489 MessageDispatch md = (MessageDispatch) delivery.getContext(); 490 lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); 491 MessageAck ack = new MessageAck(); 492 ack.setConsumerId(getConsumerId()); 493 ack.setFirstMessageId(md.getMessage().getMessageId()); 494 ack.setLastMessageId(md.getMessage().getMessageId()); 495 ack.setMessageCount(1); 496 ack.setAckType((byte) ackType); 497 ack.setDestination(md.getDestination()); 498 499 DeliveryState remoteState = delivery.getRemoteState(); 500 if (remoteState != null && remoteState instanceof TransactionalState) { 501 TransactionalState txState = (TransactionalState) remoteState; 502 TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId())); 503 ack.setTransactionId(txId); 504 505 // Store the message sent in this TX we might need to re-send on rollback 506 session.enlist(txId); 507 md.getMessage().setTransactionId(txId); 508 dispatchedInTx.addFirst(md); 509 } 510 511 LOG.trace("Sending Ack to ActiveMQ: {}", ack); 512 513 sendToActiveMQ(ack, new ResponseHandler() { 514 @Override 515 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 516 if (response.isException()) { 517 if (response.isException()) { 518 Throwable exception = ((ExceptionResponse) response).getException(); 519 exception.printStackTrace(); 520 getEndpoint().close(); 521 } 522 } else { 523 delivery.settle(); 524 } 525 session.pumpProtonToSocket(); 526 } 527 }); 528 } 529 } 530}