001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.protocol; 018 019import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; 020 021import java.io.IOException; 022import java.util.LinkedList; 023 024import org.apache.activemq.command.ActiveMQDestination; 025import org.apache.activemq.command.ActiveMQMessage; 026import org.apache.activemq.command.ConsumerControl; 027import org.apache.activemq.command.ConsumerId; 028import org.apache.activemq.command.ConsumerInfo; 029import org.apache.activemq.command.ExceptionResponse; 030import org.apache.activemq.command.LocalTransactionId; 031import org.apache.activemq.command.MessageAck; 032import org.apache.activemq.command.MessageDispatch; 033import org.apache.activemq.command.MessagePull; 034import org.apache.activemq.command.RemoveInfo; 035import org.apache.activemq.command.RemoveSubscriptionInfo; 036import org.apache.activemq.command.Response; 037import org.apache.activemq.command.TransactionId; 038import org.apache.activemq.transport.amqp.AmqpProtocolConverter; 039import org.apache.activemq.transport.amqp.ResponseHandler; 040import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor; 041import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer; 042import org.apache.activemq.transport.amqp.message.EncodedMessage; 043import org.apache.activemq.transport.amqp.message.OutboundTransformer; 044import org.apache.qpid.proton.amqp.messaging.Accepted; 045import org.apache.qpid.proton.amqp.messaging.Modified; 046import org.apache.qpid.proton.amqp.messaging.Outcome; 047import org.apache.qpid.proton.amqp.messaging.Rejected; 048import org.apache.qpid.proton.amqp.messaging.Released; 049import org.apache.qpid.proton.amqp.transaction.TransactionalState; 050import org.apache.qpid.proton.amqp.transport.AmqpError; 051import org.apache.qpid.proton.amqp.transport.DeliveryState; 052import org.apache.qpid.proton.amqp.transport.ErrorCondition; 053import org.apache.qpid.proton.amqp.transport.SenderSettleMode; 054import org.apache.qpid.proton.engine.Delivery; 055import org.apache.qpid.proton.engine.Sender; 056import org.fusesource.hawtbuf.Buffer; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060/** 061 * An AmqpSender wraps the AMQP Sender end of a link from the remote peer 062 * which holds the corresponding Receiver which receives messages transfered 063 * across the link from the Broker. 064 * 065 * An AmqpSender is in turn a message consumer subscribed to some destination 066 * on the broker. As messages are dispatched to this sender that are sent on 067 * to the remote Receiver end of the lin. 068 */ 069public class AmqpSender extends AmqpAbstractLink<Sender> { 070 071 private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class); 072 073 private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; 074 075 private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE); 076 private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator(); 077 private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>(); 078 private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>(); 079 private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT"; 080 081 private final ConsumerInfo consumerInfo; 082 private final boolean presettle; 083 084 private boolean draining; 085 private long lastDeliveredSequenceId; 086 087 private Buffer currentBuffer; 088 private Delivery currentDelivery; 089 090 /** 091 * Creates a new AmqpSender instance that manages the given Sender 092 * 093 * @param session 094 * the AmqpSession object that is the parent of this instance. 095 * @param endpoint 096 * the AMQP Sender instance that this class manages. 097 * @param consumerInfo 098 * the ConsumerInfo instance that holds configuration for this sender. 099 */ 100 public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) { 101 super(session, endpoint); 102 103 this.consumerInfo = consumerInfo; 104 this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; 105 } 106 107 @Override 108 public void open() { 109 if (!isClosed()) { 110 session.registerSender(getConsumerId(), this); 111 } 112 113 super.open(); 114 } 115 116 @Override 117 public void detach() { 118 if (!isClosed() && isOpened()) { 119 RemoveInfo removeCommand = new RemoveInfo(getConsumerId()); 120 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 121 sendToActiveMQ(removeCommand); 122 123 session.unregisterSender(getConsumerId()); 124 } 125 126 super.detach(); 127 } 128 129 @Override 130 public void close() { 131 if (!isClosed() && isOpened()) { 132 RemoveInfo removeCommand = new RemoveInfo(getConsumerId()); 133 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 134 sendToActiveMQ(removeCommand); 135 136 if (consumerInfo.isDurable()) { 137 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 138 rsi.setConnectionId(session.getConnection().getConnectionId()); 139 rsi.setSubscriptionName(getEndpoint().getName()); 140 rsi.setClientId(session.getConnection().getClientId()); 141 142 sendToActiveMQ(rsi); 143 } 144 145 session.unregisterSender(getConsumerId()); 146 } 147 148 super.close(); 149 } 150 151 @Override 152 public void flow() throws Exception { 153 if (LOG.isTraceEnabled()) { 154 LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}", 155 draining, getEndpoint().getDrain(), 156 getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued()); 157 } 158 159 if (getEndpoint().getDrain() && !draining) { 160 161 // Revert to a pull consumer. 162 ConsumerControl control = new ConsumerControl(); 163 control.setConsumerId(getConsumerId()); 164 control.setDestination(getDestination()); 165 control.setPrefetch(0); 166 167 LOG.trace("Flow: Pull case -> consumer control with prefetch (0) to control output"); 168 169 sendToActiveMQ(control); 170 171 if (endpoint.getCredit() > 0) { 172 draining = true; 173 174 // Now request dispatch of the drain amount, we request immediate 175 // timeout and an completion message regardless so that we can know 176 // when we should marked the link as drained. 177 MessagePull pullRequest = new MessagePull(); 178 pullRequest.setConsumerId(getConsumerId()); 179 pullRequest.setDestination(getDestination()); 180 pullRequest.setTimeout(-1); 181 pullRequest.setAlwaysSignalDone(true); 182 pullRequest.setQuantity(endpoint.getCredit()); 183 184 LOG.trace("Pull case -> consumer pull request quantity = {}", endpoint.getCredit()); 185 186 sendToActiveMQ(pullRequest); 187 } else { 188 LOG.trace("Pull case -> sending any Queued messages and marking drained"); 189 190 pumpOutbound(); 191 getEndpoint().drained(); 192 session.pumpProtonToSocket(); 193 } 194 } else { 195 ConsumerControl control = new ConsumerControl(); 196 control.setConsumerId(getConsumerId()); 197 control.setDestination(getDestination()); 198 control.setPrefetch(getEndpoint().getCredit()); 199 200 LOG.trace("Flow: update -> consumer control with prefetch {}", control.getPrefetch()); 201 202 sendToActiveMQ(control); 203 } 204 } 205 206 @Override 207 public void delivery(Delivery delivery) throws Exception { 208 MessageDispatch md = (MessageDispatch) delivery.getContext(); 209 DeliveryState state = delivery.getRemoteState(); 210 211 if (state instanceof TransactionalState) { 212 TransactionalState txState = (TransactionalState) state; 213 LOG.trace("onDelivery: TX delivery state = {}", state); 214 if (txState.getOutcome() != null) { 215 Outcome outcome = txState.getOutcome(); 216 if (outcome instanceof Accepted) { 217 if (!delivery.remotelySettled()) { 218 TransactionalState txAccepted = new TransactionalState(); 219 txAccepted.setOutcome(Accepted.getInstance()); 220 txAccepted.setTxnId(((TransactionalState) state).getTxnId()); 221 222 delivery.disposition(txAccepted); 223 } 224 settle(delivery, MessageAck.DELIVERED_ACK_TYPE); 225 } 226 } 227 } else { 228 if (state instanceof Accepted) { 229 LOG.trace("onDelivery: accepted state = {}", state); 230 if (!delivery.remotelySettled()) { 231 delivery.disposition(new Accepted()); 232 } 233 settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE); 234 } else if (state instanceof Rejected) { 235 // re-deliver /w incremented delivery counter. 236 md.setRedeliveryCounter(md.getRedeliveryCounter() + 1); 237 LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter()); 238 settle(delivery, -1); 239 } else if (state instanceof Released) { 240 LOG.trace("onDelivery: Released state = {}", state); 241 // re-deliver && don't increment the counter. 242 settle(delivery, -1); 243 } else if (state instanceof Modified) { 244 Modified modified = (Modified) state; 245 if (Boolean.TRUE.equals(modified.getDeliveryFailed())) { 246 // increment delivery counter.. 247 md.setRedeliveryCounter(md.getRedeliveryCounter() + 1); 248 } 249 LOG.trace("onDelivery: Modified state = {}, delivery count now {}", state, md.getRedeliveryCounter()); 250 byte ackType = -1; 251 Boolean undeliverableHere = modified.getUndeliverableHere(); 252 if (undeliverableHere != null && undeliverableHere) { 253 // receiver does not want the message.. 254 // perhaps we should DLQ it? 255 ackType = MessageAck.POSION_ACK_TYPE; 256 } 257 settle(delivery, ackType); 258 } 259 } 260 261 pumpOutbound(); 262 } 263 264 @Override 265 public void commit() throws Exception { 266 if (!dispatchedInTx.isEmpty()) { 267 for (MessageDispatch md : dispatchedInTx) { 268 MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1); 269 pendingTxAck.setFirstMessageId(md.getMessage().getMessageId()); 270 pendingTxAck.setTransactionId(md.getMessage().getTransactionId()); 271 272 LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck); 273 274 sendToActiveMQ(pendingTxAck, new ResponseHandler() { 275 @Override 276 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 277 if (response.isException()) { 278 Throwable exception = ((ExceptionResponse) response).getException(); 279 exception.printStackTrace(); 280 getEndpoint().close(); 281 } 282 session.pumpProtonToSocket(); 283 } 284 }); 285 } 286 287 dispatchedInTx.clear(); 288 } 289 } 290 291 @Override 292 public void rollback() throws Exception { 293 synchronized (outbound) { 294 295 LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size()); 296 297 for (MessageDispatch dispatch : dispatchedInTx) { 298 dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1); 299 dispatch.getMessage().setTransactionId(null); 300 outbound.addFirst(dispatch); 301 } 302 303 dispatchedInTx.clear(); 304 } 305 } 306 307 /** 308 * Event point for incoming message from ActiveMQ on this Sender's 309 * corresponding subscription. 310 * 311 * @param dispatch 312 * the MessageDispatch to process and send across the link. 313 * 314 * @throws Exception if an error occurs while encoding the message for send. 315 */ 316 public void onMessageDispatch(MessageDispatch dispatch) throws Exception { 317 if (!isClosed()) { 318 // Lock to prevent stepping on TX redelivery 319 synchronized (outbound) { 320 outbound.addLast(dispatch); 321 } 322 pumpOutbound(); 323 session.pumpProtonToSocket(); 324 } 325 } 326 327 /** 328 * Called when the Broker sends a ConsumerControl command to the Consumer that 329 * this sender creates to obtain messages to dispatch via the sender for this 330 * end of the open link. 331 * 332 * @param control 333 * The ConsumerControl command to process. 334 */ 335 public void onConsumerControl(ConsumerControl control) { 336 if (control.isClose()) { 337 close(new ErrorCondition(AmqpError.INTERNAL_ERROR, "Receiver forcably closed")); 338 session.pumpProtonToSocket(); 339 } 340 } 341 342 @Override 343 public String toString() { 344 return "AmqpSender {" + getConsumerId() + "}"; 345 } 346 347 //----- Property getters and setters -------------------------------------// 348 349 public ConsumerId getConsumerId() { 350 return consumerInfo.getConsumerId(); 351 } 352 353 @Override 354 public ActiveMQDestination getDestination() { 355 return consumerInfo.getDestination(); 356 } 357 358 @Override 359 public void setDestination(ActiveMQDestination destination) { 360 consumerInfo.setDestination(destination); 361 } 362 363 //----- Internal Implementation ------------------------------------------// 364 365 public void pumpOutbound() throws Exception { 366 while (!isClosed()) { 367 while (currentBuffer != null) { 368 int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length); 369 if (sent > 0) { 370 currentBuffer.moveHead(sent); 371 if (currentBuffer.length == 0) { 372 if (presettle) { 373 settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE); 374 } else { 375 getEndpoint().advance(); 376 } 377 currentBuffer = null; 378 currentDelivery = null; 379 } 380 } else { 381 return; 382 } 383 } 384 385 if (outbound.isEmpty()) { 386 return; 387 } 388 389 final MessageDispatch md = outbound.removeFirst(); 390 try { 391 392 ActiveMQMessage temp = null; 393 if (md.getMessage() != null) { 394 395 // Topics can dispatch the same Message to more than one consumer 396 // so we must copy to prevent concurrent read / write to the same 397 // message object. 398 if (md.getDestination().isTopic()) { 399 synchronized (md.getMessage()) { 400 temp = (ActiveMQMessage) md.getMessage().copy(); 401 } 402 } else { 403 temp = (ActiveMQMessage) md.getMessage(); 404 } 405 406 if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) { 407 temp.setProperty(MESSAGE_FORMAT_KEY, 0); 408 } 409 } 410 411 final ActiveMQMessage jms = temp; 412 if (jms == null) { 413 LOG.trace("Sender:[{}] browse done.", getEndpoint().getName()); 414 // It's the end of browse signal in response to a MessagePull 415 getEndpoint().drained(); 416 draining = false; 417 } else { 418 if (LOG.isTraceEnabled()) { 419 LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}", 420 getEndpoint().getName(), jms.getJMSMessageID(), draining, getEndpoint().getDrain(), 421 getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued()); 422 } 423 424 if (draining && getEndpoint().getCredit() == 0) { 425 LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName()); 426 getEndpoint().drained(); 427 draining = false; 428 } 429 430 jms.setRedeliveryCounter(md.getRedeliveryCounter()); 431 jms.setReadOnlyBody(true); 432 final EncodedMessage amqp = outboundTransformer.transform(jms); 433 if (amqp != null && amqp.getLength() > 0) { 434 currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength()); 435 if (presettle) { 436 currentDelivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0); 437 } else { 438 final byte[] tag = tagCache.getNextTag(); 439 currentDelivery = getEndpoint().delivery(tag, 0, tag.length); 440 } 441 currentDelivery.setContext(md); 442 } else { 443 // TODO: message could not be generated what now? 444 } 445 } 446 } catch (Exception e) { 447 LOG.warn("Error detected while flushing outbound messages: {}", e.getMessage()); 448 } 449 } 450 } 451 452 private void settle(final Delivery delivery, final int ackType) throws Exception { 453 byte[] tag = delivery.getTag(); 454 if (tag != null && tag.length > 0 && delivery.remotelySettled()) { 455 tagCache.returnTag(tag); 456 } 457 458 int newCredit = Math.max(0, getEndpoint().getCredit() - 1); 459 LOG.trace("Sender:[{}] updating conumser prefetch:{} after delivery settled.", 460 getEndpoint().getName(), newCredit); 461 462 ConsumerControl control = new ConsumerControl(); 463 control.setConsumerId(getConsumerId()); 464 control.setDestination(getDestination()); 465 control.setPrefetch(newCredit); 466 467 sendToActiveMQ(control); 468 469 if (ackType == -1) { 470 // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ 471 delivery.settle(); 472 onMessageDispatch((MessageDispatch) delivery.getContext()); 473 } else { 474 MessageDispatch md = (MessageDispatch) delivery.getContext(); 475 lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); 476 MessageAck ack = new MessageAck(); 477 ack.setConsumerId(getConsumerId()); 478 ack.setFirstMessageId(md.getMessage().getMessageId()); 479 ack.setLastMessageId(md.getMessage().getMessageId()); 480 ack.setMessageCount(1); 481 ack.setAckType((byte) ackType); 482 ack.setDestination(md.getDestination()); 483 484 DeliveryState remoteState = delivery.getRemoteState(); 485 if (remoteState != null && remoteState instanceof TransactionalState) { 486 TransactionalState txState = (TransactionalState) remoteState; 487 TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId())); 488 ack.setTransactionId(txId); 489 490 // Store the message sent in this TX we might need to re-send on rollback 491 session.enlist(txId); 492 md.getMessage().setTransactionId(txId); 493 dispatchedInTx.addFirst(md); 494 } 495 496 LOG.trace("Sending Ack to ActiveMQ: {}", ack); 497 498 sendToActiveMQ(ack, new ResponseHandler() { 499 @Override 500 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 501 if (response.isException()) { 502 if (response.isException()) { 503 Throwable exception = ((ExceptionResponse) response).getException(); 504 exception.printStackTrace(); 505 getEndpoint().close(); 506 } 507 } else { 508 delivery.settle(); 509 } 510 session.pumpProtonToSocket(); 511 } 512 }); 513 } 514 } 515}