001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.Map; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.atomic.AtomicLong; 024 025import org.apache.activemq.filter.BooleanExpression; 026import org.apache.activemq.state.CommandVisitor; 027 028/** 029 * @openwire:marshaller code="5" 030 * 031 */ 032public class ConsumerInfo extends BaseCommand implements TransientInitializer { 033 034 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_INFO; 035 036 public static final byte HIGH_PRIORITY = 10; 037 public static final byte NORMAL_PRIORITY = 0; 038 public static final byte NETWORK_CONSUMER_PRIORITY = -5; 039 public static final byte LOW_PRIORITY = -10; 040 041 protected ConsumerId consumerId; 042 protected ActiveMQDestination destination; 043 protected int prefetchSize; 044 protected int maximumPendingMessageLimit; 045 protected boolean browser; 046 protected boolean dispatchAsync; 047 protected String selector; 048 protected String clientId; 049 protected String subscriptionName; 050 protected boolean noLocal; 051 protected boolean exclusive; 052 protected boolean retroactive; 053 protected byte priority; 054 protected BrokerId[] brokerPath; 055 protected boolean optimizedAcknowledge; 056 // used by the broker 057 protected transient int currentPrefetchSize; 058 // if true, the consumer will not send range 059 protected boolean noRangeAcks; 060 // acks. 061 062 protected BooleanExpression additionalPredicate; 063 protected transient boolean networkSubscription; // this subscription 064 protected transient List<ConsumerId> networkConsumerIds; // the original consumerId 065 066 // not marshalled, populated from RemoveInfo, the last message delivered, used 067 // to suppress redelivery on prefetched messages after close 068 private transient long lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET; 069 private transient Map<ActiveMQDestination, AtomicLong> assignedGroupCount = new ConcurrentHashMap<>(); 070 // originated from a 071 // network connection 072 073 public ConsumerInfo() { 074 } 075 076 public ConsumerInfo(ConsumerId consumerId) { 077 this.consumerId = consumerId; 078 } 079 080 public ConsumerInfo(SessionInfo sessionInfo, long consumerId) { 081 this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId); 082 } 083 084 public ConsumerInfo copy() { 085 ConsumerInfo info = new ConsumerInfo(); 086 copy(info); 087 return info; 088 } 089 090 public void copy(ConsumerInfo info) { 091 super.copy(info); 092 info.consumerId = consumerId; 093 info.destination = destination; 094 info.prefetchSize = prefetchSize; 095 info.maximumPendingMessageLimit = maximumPendingMessageLimit; 096 info.browser = browser; 097 info.dispatchAsync = dispatchAsync; 098 info.selector = selector; 099 info.clientId = clientId; 100 info.subscriptionName = subscriptionName; 101 info.noLocal = noLocal; 102 info.exclusive = exclusive; 103 info.retroactive = retroactive; 104 info.priority = priority; 105 info.brokerPath = brokerPath; 106 info.networkSubscription = networkSubscription; 107 if (networkConsumerIds != null) { 108 if (info.networkConsumerIds==null){ 109 info.networkConsumerIds=new ArrayList<ConsumerId>(); 110 } 111 info.networkConsumerIds.addAll(networkConsumerIds); 112 } 113 } 114 115 public boolean isDurable() { 116 return subscriptionName != null; 117 } 118 119 @Override 120 public byte getDataStructureType() { 121 return DATA_STRUCTURE_TYPE; 122 } 123 124 /** 125 * Is used to uniquely identify the consumer to the broker. 126 * 127 * @openwire:property version=1 cache=true 128 */ 129 public ConsumerId getConsumerId() { 130 return consumerId; 131 } 132 133 public void setConsumerId(ConsumerId consumerId) { 134 this.consumerId = consumerId; 135 } 136 137 /** 138 * Is this consumer a queue browser? 139 * 140 * @openwire:property version=1 141 */ 142 public boolean isBrowser() { 143 return browser; 144 } 145 146 public void setBrowser(boolean browser) { 147 this.browser = browser; 148 } 149 150 /** 151 * The destination that the consumer is interested in receiving messages 152 * from. This destination could be a composite destination. 153 * 154 * @openwire:property version=1 cache=true 155 */ 156 public ActiveMQDestination getDestination() { 157 return destination; 158 } 159 160 public void setDestination(ActiveMQDestination destination) { 161 this.destination = destination; 162 } 163 164 /** 165 * How many messages a broker will send to the client without receiving an 166 * ack before he stops dispatching messages to the client. 167 * 168 * @openwire:property version=1 169 */ 170 public int getPrefetchSize() { 171 return prefetchSize; 172 } 173 174 public void setPrefetchSize(int prefetchSize) { 175 this.prefetchSize = prefetchSize; 176 this.currentPrefetchSize = prefetchSize; 177 } 178 179 /** 180 * How many messages a broker will keep around, above the prefetch limit, 181 * for non-durable topics before starting to discard older messages. 182 * 183 * @openwire:property version=1 184 */ 185 public int getMaximumPendingMessageLimit() { 186 return maximumPendingMessageLimit; 187 } 188 189 public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) { 190 this.maximumPendingMessageLimit = maximumPendingMessageLimit; 191 } 192 193 /** 194 * Should the broker dispatch a message to the consumer async? If he does it 195 * async, then he uses a more SEDA style of processing while if it is not 196 * done async, then he broker use a STP style of processing. STP is more 197 * appropriate in high bandwidth situations or when being used by and in vm 198 * transport. 199 * 200 * @openwire:property version=1 201 */ 202 public boolean isDispatchAsync() { 203 return dispatchAsync; 204 } 205 206 public void setDispatchAsync(boolean dispatchAsync) { 207 this.dispatchAsync = dispatchAsync; 208 } 209 210 /** 211 * The JMS selector used to filter out messages that this consumer is 212 * interested in. 213 * 214 * @openwire:property version=1 215 */ 216 public String getSelector() { 217 return selector; 218 } 219 220 public void setSelector(String selector) { 221 this.selector = selector; 222 } 223 224 /** 225 * Used to identify the id of a client connection. 226 * 227 * @openwire:property version=10 228 */ 229 public String getClientId() { 230 return clientId; 231 } 232 233 public void setClientId(String clientId) { 234 this.clientId = clientId; 235 } 236 237 /** 238 * Used to identify the name of a durable subscription. 239 * 240 * @openwire:property version=1 241 */ 242 public String getSubscriptionName() { 243 return subscriptionName; 244 } 245 246 public void setSubscriptionName(String durableSubscriptionId) { 247 this.subscriptionName = durableSubscriptionId; 248 } 249 250 /** 251 * Set noLocal to true to avoid receiving messages that were published 252 * locally on the same connection. 253 * 254 * @openwire:property version=1 255 */ 256 public boolean isNoLocal() { 257 return noLocal; 258 } 259 260 public void setNoLocal(boolean noLocal) { 261 this.noLocal = noLocal; 262 } 263 264 /** 265 * An exclusive consumer locks out other consumers from being able to 266 * receive messages from the destination. If there are multiple exclusive 267 * consumers for a destination, the first one created will be the exclusive 268 * consumer of the destination. 269 * 270 * @openwire:property version=1 271 */ 272 public boolean isExclusive() { 273 return exclusive; 274 } 275 276 public void setExclusive(boolean exclusive) { 277 this.exclusive = exclusive; 278 } 279 280 /** 281 * A retroactive consumer only has meaning for Topics. It allows a consumer 282 * to retroactively see messages sent prior to the consumer being created. 283 * If the consumer is not durable, it will be delivered the last message 284 * published to the topic. If the consumer is durable then it will receive 285 * all persistent messages that are still stored in persistent storage for 286 * that topic. 287 * 288 * @openwire:property version=1 289 */ 290 public boolean isRetroactive() { 291 return retroactive; 292 } 293 294 public void setRetroactive(boolean retroactive) { 295 this.retroactive = retroactive; 296 } 297 298 public RemoveInfo createRemoveCommand() { 299 RemoveInfo command = new RemoveInfo(getConsumerId()); 300 command.setResponseRequired(isResponseRequired()); 301 return command; 302 } 303 304 /** 305 * The broker will avoid dispatching to a lower priority consumer if there 306 * are other higher priority consumers available to dispatch to. This allows 307 * letting the broker to have an affinity to higher priority consumers. 308 * Default priority is 0. 309 * 310 * @openwire:property version=1 311 */ 312 public byte getPriority() { 313 return priority; 314 } 315 316 public void setPriority(byte priority) { 317 this.priority = priority; 318 } 319 320 /** 321 * The route of brokers the command has moved through. 322 * 323 * @openwire:property version=1 cache=true 324 */ 325 public BrokerId[] getBrokerPath() { 326 return brokerPath; 327 } 328 329 public void setBrokerPath(BrokerId[] brokerPath) { 330 this.brokerPath = brokerPath; 331 } 332 333 /** 334 * A transient additional predicate that can be used it inject additional 335 * predicates into the selector on the fly. Handy if if say a Security 336 * Broker interceptor wants to filter out messages based on security level 337 * of the consumer. 338 * 339 * @openwire:property version=1 340 */ 341 public BooleanExpression getAdditionalPredicate() { 342 return additionalPredicate; 343 } 344 345 public void setAdditionalPredicate(BooleanExpression additionalPredicate) { 346 this.additionalPredicate = additionalPredicate; 347 } 348 349 @Override 350 public Response visit(CommandVisitor visitor) throws Exception { 351 return visitor.processAddConsumer(this); 352 } 353 354 /** 355 * @openwire:property version=1 356 * @return Returns the networkSubscription. 357 */ 358 public boolean isNetworkSubscription() { 359 return networkSubscription; 360 } 361 362 /** 363 * @param networkSubscription The networkSubscription to set. 364 */ 365 public void setNetworkSubscription(boolean networkSubscription) { 366 this.networkSubscription = networkSubscription; 367 } 368 369 /** 370 * @openwire:property version=1 371 * @return Returns the optimizedAcknowledge. 372 */ 373 public boolean isOptimizedAcknowledge() { 374 return optimizedAcknowledge; 375 } 376 377 /** 378 * @param optimizedAcknowledge The optimizedAcknowledge to set. 379 */ 380 public void setOptimizedAcknowledge(boolean optimizedAcknowledge) { 381 this.optimizedAcknowledge = optimizedAcknowledge; 382 } 383 384 /** 385 * @return Returns the currentPrefetchSize. 386 */ 387 public int getCurrentPrefetchSize() { 388 return currentPrefetchSize; 389 } 390 391 /** 392 * @param currentPrefetchSize The currentPrefetchSize to set. 393 */ 394 public void setCurrentPrefetchSize(int currentPrefetchSize) { 395 this.currentPrefetchSize = currentPrefetchSize; 396 } 397 398 /** 399 * The broker may be able to optimize it's processing or provides better QOS 400 * if it knows the consumer will not be sending ranged acks. 401 * 402 * @return true if the consumer will not send range acks. 403 * @openwire:property version=1 404 */ 405 public boolean isNoRangeAcks() { 406 return noRangeAcks; 407 } 408 409 public void setNoRangeAcks(boolean noRangeAcks) { 410 this.noRangeAcks = noRangeAcks; 411 } 412 413 public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) { 414 if (networkConsumerIds == null) { 415 networkConsumerIds = new ArrayList<ConsumerId>(); 416 } 417 networkConsumerIds.add(networkConsumerId); 418 } 419 420 public synchronized void removeNetworkConsumerId(ConsumerId networkConsumerId) { 421 if (networkConsumerIds != null) { 422 networkConsumerIds.remove(networkConsumerId); 423 if (networkConsumerIds.isEmpty()) { 424 networkConsumerIds=null; 425 } 426 } 427 } 428 429 public synchronized boolean isNetworkConsumersEmpty() { 430 return networkConsumerIds == null || networkConsumerIds.isEmpty(); 431 } 432 433 public synchronized List<ConsumerId> getNetworkConsumerIds(){ 434 List<ConsumerId> result = new ArrayList<ConsumerId>(); 435 if (networkConsumerIds != null) { 436 result.addAll(networkConsumerIds); 437 } 438 return result; 439 } 440 441 @Override 442 public int hashCode() { 443 return (consumerId == null) ? 0 : consumerId.hashCode(); 444 } 445 446 @Override 447 public boolean equals(Object obj) { 448 if (this == obj) { 449 return true; 450 } 451 if (obj == null) { 452 return false; 453 } 454 if (getClass() != obj.getClass()) { 455 return false; 456 } 457 458 ConsumerInfo other = (ConsumerInfo) obj; 459 460 if (consumerId == null && other.consumerId != null) { 461 return false; 462 } else if (!consumerId.equals(other.consumerId)) { 463 return false; 464 } 465 return true; 466 } 467 468 /** 469 * Tracks the original subscription id that causes a subscription to 470 * percolate through a network when networkTTL > 1. Tracking the original 471 * subscription allows duplicate suppression. 472 * 473 * @return array of the current subscription path 474 * @openwire:property version=4 475 */ 476 public ConsumerId[] getNetworkConsumerPath() { 477 ConsumerId[] result = null; 478 if (networkConsumerIds != null) { 479 result = networkConsumerIds.toArray(new ConsumerId[0]); 480 } 481 return result; 482 } 483 484 public void setNetworkConsumerPath(ConsumerId[] consumerPath) { 485 if (consumerPath != null) { 486 for (int i=0; i<consumerPath.length; i++) { 487 addNetworkConsumerId(consumerPath[i]); 488 } 489 } 490 } 491 492 public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) { 493 this.lastDeliveredSequenceId = lastDeliveredSequenceId; 494 } 495 496 public long getLastDeliveredSequenceId() { 497 return lastDeliveredSequenceId; 498 } 499 500 public void incrementAssignedGroupCount(final ActiveMQDestination dest) { 501 AtomicLong value = assignedGroupCount.get(dest); 502 if (value == null) { 503 value = new AtomicLong(0); 504 assignedGroupCount.put(dest, value); 505 } 506 value.incrementAndGet(); 507 } 508 509 public void clearAssignedGroupCount(final ActiveMQDestination dest) { 510 assignedGroupCount.remove(dest); 511 } 512 513 public void decrementAssignedGroupCount(final ActiveMQDestination dest) { 514 AtomicLong value = assignedGroupCount.get(dest); 515 if (value != null) { 516 value.decrementAndGet(); 517 } 518 } 519 520 public long getAssignedGroupCount(final ActiveMQDestination dest) { 521 long result = 0l; 522 AtomicLong value = assignedGroupCount.get(dest); 523 if (value != null) { 524 result = value.longValue(); 525 } 526 return result; 527 } 528 529 @Override 530 public void initTransients() { 531 assignedGroupCount = new ConcurrentHashMap<>(); 532 lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET; 533 } 534 535}