001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import org.apache.activemq.state.CommandVisitor; 020 021/** 022 * @openwire:marshaller code="22" 023 * 024 */ 025public class MessageAck extends BaseCommand { 026 027 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ACK; 028 029 /** 030 * Used to let the broker know that the message has been delivered to the 031 * client. Message will still be retained until an standard ack is received. 032 * This is used get the broker to send more messages past prefetch limits 033 * when an standard ack has not been sent. 034 */ 035 public static final byte DELIVERED_ACK_TYPE = 0; 036 037 /** 038 * The standard ack case where a client wants the message to be discarded. 039 */ 040 public static final byte STANDARD_ACK_TYPE = 2; 041 042 /** 043 * In case the client want's to explicitly let the broker know that a 044 * message was not processed and the message was considered a poison 045 * message. 046 */ 047 public static final byte POSION_ACK_TYPE = 1; 048 049 /** 050 * In case the client want's to explicitly let the broker know that a 051 * message was not processed and it was re-delivered to the consumer 052 * but it was not yet considered to be a poison message. The messageCount 053 * field will hold the number of times the message was re-delivered. 054 */ 055 public static final byte REDELIVERED_ACK_TYPE = 3; 056 057 /** 058 * The ack case where a client wants only an individual message to be discarded. 059 */ 060 public static final byte INDIVIDUAL_ACK_TYPE = 4; 061 062/** 063 * The ack case where a durable topic subscription does not match a selector. 064 */ 065 public static final byte UNMATCHED_ACK_TYPE = 5; 066 067 /** 068 * the case where a consumer does not dispatch because message has expired inflight 069 */ 070 public static final byte EXPIRED_ACK_TYPE = 6; 071 072 protected byte ackType; 073 protected ConsumerId consumerId; 074 protected MessageId firstMessageId; 075 protected MessageId lastMessageId; 076 protected ActiveMQDestination destination; 077 protected TransactionId transactionId; 078 protected int messageCount; 079 protected Throwable poisonCause; 080 081 protected transient String consumerKey; 082 083 public MessageAck() { 084 } 085 086 public MessageAck(MessageDispatch md, byte ackType, int messageCount) { 087 this.ackType = ackType; 088 this.consumerId = md.getConsumerId(); 089 this.destination = md.getDestination(); 090 this.lastMessageId = md.getMessage().getMessageId(); 091 this.messageCount = messageCount; 092 } 093 094 public MessageAck(Message message, byte ackType, int messageCount) { 095 this.ackType = ackType; 096 this.destination = message.getDestination(); 097 this.lastMessageId = message.getMessageId(); 098 this.messageCount = messageCount; 099 } 100 101 public void copy(MessageAck copy) { 102 super.copy(copy); 103 copy.firstMessageId = firstMessageId; 104 copy.lastMessageId = lastMessageId; 105 copy.destination = destination; 106 copy.transactionId = transactionId; 107 copy.ackType = ackType; 108 copy.consumerId = consumerId; 109 } 110 111 public byte getDataStructureType() { 112 return DATA_STRUCTURE_TYPE; 113 } 114 115 public boolean isMessageAck() { 116 return true; 117 } 118 119 public boolean isPoisonAck() { 120 return ackType == POSION_ACK_TYPE; 121 } 122 123 public boolean isStandardAck() { 124 return ackType == STANDARD_ACK_TYPE; 125 } 126 127 public boolean isDeliveredAck() { 128 return ackType == DELIVERED_ACK_TYPE; 129 } 130 131 public boolean isRedeliveredAck() { 132 return ackType == REDELIVERED_ACK_TYPE; 133 } 134 135 public boolean isIndividualAck() { 136 return ackType == INDIVIDUAL_ACK_TYPE; 137 } 138 139 public boolean isUnmatchedAck() { 140 return ackType == UNMATCHED_ACK_TYPE; 141 } 142 143 public boolean isExpiredAck() { 144 return ackType == EXPIRED_ACK_TYPE; 145 } 146 147 /** 148 * @openwire:property version=1 cache=true 149 */ 150 public ActiveMQDestination getDestination() { 151 return destination; 152 } 153 154 public void setDestination(ActiveMQDestination destination) { 155 this.destination = destination; 156 } 157 158 /** 159 * @openwire:property version=1 cache=true 160 */ 161 public TransactionId getTransactionId() { 162 return transactionId; 163 } 164 165 public void setTransactionId(TransactionId transactionId) { 166 this.transactionId = transactionId; 167 } 168 169 public boolean isInTransaction() { 170 return transactionId != null; 171 } 172 173 /** 174 * @openwire:property version=1 cache=true 175 */ 176 public ConsumerId getConsumerId() { 177 return consumerId; 178 } 179 180 public void setConsumerId(ConsumerId consumerId) { 181 this.consumerId = consumerId; 182 } 183 184 /** 185 * @openwire:property version=1 186 */ 187 public byte getAckType() { 188 return ackType; 189 } 190 191 public void setAckType(byte ackType) { 192 this.ackType = ackType; 193 } 194 195 /** 196 * @openwire:property version=1 197 */ 198 public MessageId getFirstMessageId() { 199 return firstMessageId; 200 } 201 202 public void setFirstMessageId(MessageId firstMessageId) { 203 this.firstMessageId = firstMessageId; 204 } 205 206 /** 207 * @openwire:property version=1 208 */ 209 public MessageId getLastMessageId() { 210 return lastMessageId; 211 } 212 213 public void setLastMessageId(MessageId lastMessageId) { 214 this.lastMessageId = lastMessageId; 215 } 216 217 /** 218 * The number of messages being acknowledged in the range. 219 * 220 * @openwire:property version=1 221 */ 222 public int getMessageCount() { 223 return messageCount; 224 } 225 226 public void setMessageCount(int messageCount) { 227 this.messageCount = messageCount; 228 } 229 230 /** 231 * The cause of a poison ack, if a message listener 232 * throws an exception it will be recorded here 233 * 234 * @openwire:property version=7 235 */ 236 public Throwable getPoisonCause() { 237 return poisonCause; 238 } 239 240 public void setPoisonCause(Throwable poisonCause) { 241 this.poisonCause = poisonCause; 242 } 243 244 public Response visit(CommandVisitor visitor) throws Exception { 245 return visitor.processMessageAck(this); 246 } 247 248 /** 249 * A helper method to allow a single message ID to be acknowledged 250 */ 251 public void setMessageID(MessageId messageID) { 252 setFirstMessageId(messageID); 253 setLastMessageId(messageID); 254 setMessageCount(1); 255 } 256 257}