001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.util.ArrayList; 020import java.util.HashMap; 021import java.util.List; 022import java.util.Map; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.atomic.AtomicLong; 025 026import org.apache.activemq.filter.BooleanExpression; 027import org.apache.activemq.state.CommandVisitor; 028 029/** 030 * @openwire:marshaller code="5" 031 * 032 */ 033public class ConsumerInfo extends BaseCommand implements TransientInitializer { 034 035 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_INFO; 036 037 public static final byte HIGH_PRIORITY = 10; 038 public static final byte NORMAL_PRIORITY = 0; 039 public static final byte NETWORK_CONSUMER_PRIORITY = -5; 040 public static final byte LOW_PRIORITY = -10; 041 042 protected ConsumerId consumerId; 043 protected ActiveMQDestination destination; 044 protected int prefetchSize; 045 protected int maximumPendingMessageLimit; 046 protected boolean browser; 047 protected boolean dispatchAsync; 048 protected String selector; 049 protected String clientId; 050 protected String subscriptionName; 051 protected boolean noLocal; 052 protected boolean exclusive; 053 protected boolean retroactive; 054 protected byte priority; 055 protected BrokerId[] brokerPath; 056 protected boolean optimizedAcknowledge; 057 // used by the broker 058 protected transient int currentPrefetchSize; 059 // if true, the consumer will not send range 060 protected boolean noRangeAcks; 061 // acks. 062 063 protected BooleanExpression additionalPredicate; 064 protected transient boolean networkSubscription; // this subscription 065 protected transient List<ConsumerId> networkConsumerIds; // the original consumerId 066 067 // not marshalled, populated from RemoveInfo, the last message delivered, used 068 // to suppress redelivery on prefetched messages after close 069 private transient long lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET; 070 private transient Map<ActiveMQDestination, AtomicLong> assignedGroupCount = new ConcurrentHashMap<>(); 071 // originated from a 072 // network connection 073 074 public ConsumerInfo() { 075 } 076 077 public ConsumerInfo(ConsumerId consumerId) { 078 this.consumerId = consumerId; 079 } 080 081 public ConsumerInfo(SessionInfo sessionInfo, long consumerId) { 082 this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId); 083 } 084 085 public ConsumerInfo copy() { 086 ConsumerInfo info = new ConsumerInfo(); 087 copy(info); 088 return info; 089 } 090 091 public void copy(ConsumerInfo info) { 092 super.copy(info); 093 info.consumerId = consumerId; 094 info.destination = destination; 095 info.prefetchSize = prefetchSize; 096 info.maximumPendingMessageLimit = maximumPendingMessageLimit; 097 info.browser = browser; 098 info.dispatchAsync = dispatchAsync; 099 info.selector = selector; 100 info.clientId = clientId; 101 info.subscriptionName = subscriptionName; 102 info.noLocal = noLocal; 103 info.exclusive = exclusive; 104 info.retroactive = retroactive; 105 info.priority = priority; 106 info.brokerPath = brokerPath; 107 info.networkSubscription = networkSubscription; 108 if (networkConsumerIds != null) { 109 if (info.networkConsumerIds==null){ 110 info.networkConsumerIds=new ArrayList<ConsumerId>(); 111 } 112 info.networkConsumerIds.addAll(networkConsumerIds); 113 } 114 } 115 116 public boolean isDurable() { 117 return subscriptionName != null; 118 } 119 120 @Override 121 public byte getDataStructureType() { 122 return DATA_STRUCTURE_TYPE; 123 } 124 125 /** 126 * Is used to uniquely identify the consumer to the broker. 127 * 128 * @openwire:property version=1 cache=true 129 */ 130 public ConsumerId getConsumerId() { 131 return consumerId; 132 } 133 134 public void setConsumerId(ConsumerId consumerId) { 135 this.consumerId = consumerId; 136 } 137 138 /** 139 * Is this consumer a queue browser? 140 * 141 * @openwire:property version=1 142 */ 143 public boolean isBrowser() { 144 return browser; 145 } 146 147 public void setBrowser(boolean browser) { 148 this.browser = browser; 149 } 150 151 /** 152 * The destination that the consumer is interested in receiving messages 153 * from. This destination could be a composite destination. 154 * 155 * @openwire:property version=1 cache=true 156 */ 157 public ActiveMQDestination getDestination() { 158 return destination; 159 } 160 161 public void setDestination(ActiveMQDestination destination) { 162 this.destination = destination; 163 } 164 165 /** 166 * How many messages a broker will send to the client without receiving an 167 * ack before he stops dispatching messages to the client. 168 * 169 * @openwire:property version=1 170 */ 171 public int getPrefetchSize() { 172 return prefetchSize; 173 } 174 175 public void setPrefetchSize(int prefetchSize) { 176 this.prefetchSize = prefetchSize; 177 this.currentPrefetchSize = prefetchSize; 178 } 179 180 /** 181 * How many messages a broker will keep around, above the prefetch limit, 182 * for non-durable topics before starting to discard older messages. 183 * 184 * @openwire:property version=1 185 */ 186 public int getMaximumPendingMessageLimit() { 187 return maximumPendingMessageLimit; 188 } 189 190 public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) { 191 this.maximumPendingMessageLimit = maximumPendingMessageLimit; 192 } 193 194 /** 195 * Should the broker dispatch a message to the consumer async? If he does it 196 * async, then he uses a more SEDA style of processing while if it is not 197 * done async, then he broker use a STP style of processing. STP is more 198 * appropriate in high bandwidth situations or when being used by and in vm 199 * transport. 200 * 201 * @openwire:property version=1 202 */ 203 public boolean isDispatchAsync() { 204 return dispatchAsync; 205 } 206 207 public void setDispatchAsync(boolean dispatchAsync) { 208 this.dispatchAsync = dispatchAsync; 209 } 210 211 /** 212 * The JMS selector used to filter out messages that this consumer is 213 * interested in. 214 * 215 * @openwire:property version=1 216 */ 217 public String getSelector() { 218 return selector; 219 } 220 221 public void setSelector(String selector) { 222 this.selector = selector; 223 } 224 225 /** 226 * Used to identify the id of a client connection. 227 * 228 * @openwire:property version=10 229 */ 230 public String getClientId() { 231 return clientId; 232 } 233 234 public void setClientId(String clientId) { 235 this.clientId = clientId; 236 } 237 238 /** 239 * Used to identify the name of a durable subscription. 240 * 241 * @openwire:property version=1 242 */ 243 public String getSubscriptionName() { 244 return subscriptionName; 245 } 246 247 public void setSubscriptionName(String durableSubscriptionId) { 248 this.subscriptionName = durableSubscriptionId; 249 } 250 251 /** 252 * Set noLocal to true to avoid receiving messages that were published 253 * locally on the same connection. 254 * 255 * @openwire:property version=1 256 */ 257 public boolean isNoLocal() { 258 return noLocal; 259 } 260 261 public void setNoLocal(boolean noLocal) { 262 this.noLocal = noLocal; 263 } 264 265 /** 266 * An exclusive consumer locks out other consumers from being able to 267 * receive messages from the destination. If there are multiple exclusive 268 * consumers for a destination, the first one created will be the exclusive 269 * consumer of the destination. 270 * 271 * @openwire:property version=1 272 */ 273 public boolean isExclusive() { 274 return exclusive; 275 } 276 277 public void setExclusive(boolean exclusive) { 278 this.exclusive = exclusive; 279 } 280 281 /** 282 * A retroactive consumer only has meaning for Topics. It allows a consumer 283 * to retroactively see messages sent prior to the consumer being created. 284 * If the consumer is not durable, it will be delivered the last message 285 * published to the topic. If the consumer is durable then it will receive 286 * all persistent messages that are still stored in persistent storage for 287 * that topic. 288 * 289 * @openwire:property version=1 290 */ 291 public boolean isRetroactive() { 292 return retroactive; 293 } 294 295 public void setRetroactive(boolean retroactive) { 296 this.retroactive = retroactive; 297 } 298 299 public RemoveInfo createRemoveCommand() { 300 RemoveInfo command = new RemoveInfo(getConsumerId()); 301 command.setResponseRequired(isResponseRequired()); 302 return command; 303 } 304 305 /** 306 * The broker will avoid dispatching to a lower priority consumer if there 307 * are other higher priority consumers available to dispatch to. This allows 308 * letting the broker to have an affinity to higher priority consumers. 309 * Default priority is 0. 310 * 311 * @openwire:property version=1 312 */ 313 public byte getPriority() { 314 return priority; 315 } 316 317 public void setPriority(byte priority) { 318 this.priority = priority; 319 } 320 321 /** 322 * The route of brokers the command has moved through. 323 * 324 * @openwire:property version=1 cache=true 325 */ 326 public BrokerId[] getBrokerPath() { 327 return brokerPath; 328 } 329 330 public void setBrokerPath(BrokerId[] brokerPath) { 331 this.brokerPath = brokerPath; 332 } 333 334 /** 335 * A transient additional predicate that can be used it inject additional 336 * predicates into the selector on the fly. Handy if if say a Security 337 * Broker interceptor wants to filter out messages based on security level 338 * of the consumer. 339 * 340 * @openwire:property version=1 341 */ 342 public BooleanExpression getAdditionalPredicate() { 343 return additionalPredicate; 344 } 345 346 public void setAdditionalPredicate(BooleanExpression additionalPredicate) { 347 this.additionalPredicate = additionalPredicate; 348 } 349 350 @Override 351 public Response visit(CommandVisitor visitor) throws Exception { 352 return visitor.processAddConsumer(this); 353 } 354 355 /** 356 * @openwire:property version=1 357 * @return Returns the networkSubscription. 358 */ 359 public boolean isNetworkSubscription() { 360 return networkSubscription; 361 } 362 363 /** 364 * @param networkSubscription The networkSubscription to set. 365 */ 366 public void setNetworkSubscription(boolean networkSubscription) { 367 this.networkSubscription = networkSubscription; 368 } 369 370 /** 371 * @openwire:property version=1 372 * @return Returns the optimizedAcknowledge. 373 */ 374 public boolean isOptimizedAcknowledge() { 375 return optimizedAcknowledge; 376 } 377 378 /** 379 * @param optimizedAcknowledge The optimizedAcknowledge to set. 380 */ 381 public void setOptimizedAcknowledge(boolean optimizedAcknowledge) { 382 this.optimizedAcknowledge = optimizedAcknowledge; 383 } 384 385 /** 386 * @return Returns the currentPrefetchSize. 387 */ 388 public int getCurrentPrefetchSize() { 389 return currentPrefetchSize; 390 } 391 392 /** 393 * @param currentPrefetchSize The currentPrefetchSize to set. 394 */ 395 public void setCurrentPrefetchSize(int currentPrefetchSize) { 396 this.currentPrefetchSize = currentPrefetchSize; 397 } 398 399 /** 400 * The broker may be able to optimize it's processing or provides better QOS 401 * if it knows the consumer will not be sending ranged acks. 402 * 403 * @return true if the consumer will not send range acks. 404 * @openwire:property version=1 405 */ 406 public boolean isNoRangeAcks() { 407 return noRangeAcks; 408 } 409 410 public void setNoRangeAcks(boolean noRangeAcks) { 411 this.noRangeAcks = noRangeAcks; 412 } 413 414 public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) { 415 if (networkConsumerIds == null) { 416 networkConsumerIds = new ArrayList<ConsumerId>(); 417 } 418 networkConsumerIds.add(networkConsumerId); 419 } 420 421 public synchronized void removeNetworkConsumerId(ConsumerId networkConsumerId) { 422 if (networkConsumerIds != null) { 423 networkConsumerIds.remove(networkConsumerId); 424 if (networkConsumerIds.isEmpty()) { 425 networkConsumerIds=null; 426 } 427 } 428 } 429 430 public synchronized boolean isNetworkConsumersEmpty() { 431 return networkConsumerIds == null || networkConsumerIds.isEmpty(); 432 } 433 434 public synchronized List<ConsumerId> getNetworkConsumerIds(){ 435 List<ConsumerId> result = new ArrayList<ConsumerId>(); 436 if (networkConsumerIds != null) { 437 result.addAll(networkConsumerIds); 438 } 439 return result; 440 } 441 442 @Override 443 public int hashCode() { 444 return (consumerId == null) ? 0 : consumerId.hashCode(); 445 } 446 447 @Override 448 public boolean equals(Object obj) { 449 if (this == obj) { 450 return true; 451 } 452 if (obj == null) { 453 return false; 454 } 455 if (getClass() != obj.getClass()) { 456 return false; 457 } 458 459 ConsumerInfo other = (ConsumerInfo) obj; 460 461 if (consumerId == null && other.consumerId != null) { 462 return false; 463 } else if (!consumerId.equals(other.consumerId)) { 464 return false; 465 } 466 return true; 467 } 468 469 /** 470 * Tracks the original subscription id that causes a subscription to 471 * percolate through a network when networkTTL > 1. Tracking the original 472 * subscription allows duplicate suppression. 473 * 474 * @return array of the current subscription path 475 * @openwire:property version=4 476 */ 477 public ConsumerId[] getNetworkConsumerPath() { 478 ConsumerId[] result = null; 479 if (networkConsumerIds != null) { 480 result = networkConsumerIds.toArray(new ConsumerId[0]); 481 } 482 return result; 483 } 484 485 public void setNetworkConsumerPath(ConsumerId[] consumerPath) { 486 if (consumerPath != null) { 487 for (int i=0; i<consumerPath.length; i++) { 488 addNetworkConsumerId(consumerPath[i]); 489 } 490 } 491 } 492 493 public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) { 494 this.lastDeliveredSequenceId = lastDeliveredSequenceId; 495 } 496 497 public long getLastDeliveredSequenceId() { 498 return lastDeliveredSequenceId; 499 } 500 501 public void incrementAssignedGroupCount(final ActiveMQDestination dest) { 502 AtomicLong value = assignedGroupCount.get(dest); 503 if (value == null) { 504 value = new AtomicLong(0); 505 assignedGroupCount.put(dest, value); 506 } 507 value.incrementAndGet(); 508 } 509 510 public void clearAssignedGroupCount(final ActiveMQDestination dest) { 511 assignedGroupCount.remove(dest); 512 } 513 514 public void decrementAssignedGroupCount(final ActiveMQDestination dest) { 515 AtomicLong value = assignedGroupCount.get(dest); 516 if (value != null) { 517 value.decrementAndGet(); 518 } 519 } 520 521 public long getAssignedGroupCount(final ActiveMQDestination dest) { 522 long result = 0l; 523 AtomicLong value = assignedGroupCount.get(dest); 524 if (value != null) { 525 result = value.longValue(); 526 } 527 return result; 528 } 529 530 @Override 531 public void initTransients() { 532 assignedGroupCount = new ConcurrentHashMap<>(); 533 lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET; 534 } 535 536 @Override 537 public String toString() { 538 HashMap<String, Object> overrideFields = new HashMap<String, Object>(); 539 overrideFields.put("networkConsumerIds", networkConsumerIds); 540 return super.toString(overrideFields); 541 } 542 543}