001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.policy; 018 019import java.util.Set; 020 021import org.apache.activemq.ActiveMQPrefetchPolicy; 022import org.apache.activemq.broker.Broker; 023import org.apache.activemq.broker.region.BaseDestination; 024import org.apache.activemq.broker.region.Destination; 025import org.apache.activemq.broker.region.DurableTopicSubscription; 026import org.apache.activemq.broker.region.Queue; 027import org.apache.activemq.broker.region.QueueBrowserSubscription; 028import org.apache.activemq.broker.region.QueueSubscription; 029import org.apache.activemq.broker.region.Subscription; 030import org.apache.activemq.broker.region.Topic; 031import org.apache.activemq.broker.region.TopicSubscription; 032import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 033import org.apache.activemq.broker.region.group.GroupFactoryFinder; 034import org.apache.activemq.broker.region.group.MessageGroupMapFactory; 035import org.apache.activemq.filter.DestinationMapEntry; 036import org.apache.activemq.network.NetworkBridgeFilterFactory; 037import org.apache.activemq.usage.SystemUsage; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * Represents an entry in a {@link PolicyMap} for assigning policies to a 043 * specific destination or a hierarchical wildcard area of destinations. 044 * 045 * @org.apache.xbean.XBean 046 * 047 */ 048public class PolicyEntry extends DestinationMapEntry { 049 050 private static final Logger LOG = LoggerFactory.getLogger(PolicyEntry.class); 051 private DispatchPolicy dispatchPolicy; 052 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 053 private boolean sendAdvisoryIfNoConsumers; 054 private DeadLetterStrategy deadLetterStrategy = Destination.DEFAULT_DEAD_LETTER_STRATEGY; 055 private PendingMessageLimitStrategy pendingMessageLimitStrategy; 056 private MessageEvictionStrategy messageEvictionStrategy; 057 private long memoryLimit; 058 private String messageGroupMapFactoryType = "cached"; 059 private MessageGroupMapFactory messageGroupMapFactory; 060 private PendingQueueMessageStoragePolicy pendingQueuePolicy; 061 private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy; 062 private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy; 063 private int maxProducersToAudit=BaseDestination.MAX_PRODUCERS_TO_AUDIT; 064 private int maxAuditDepth=BaseDestination.MAX_AUDIT_DEPTH; 065 private int maxQueueAuditDepth=BaseDestination.MAX_AUDIT_DEPTH; 066 private boolean enableAudit=true; 067 private boolean producerFlowControl = true; 068 private boolean alwaysRetroactive = false; 069 private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; 070 private boolean optimizedDispatch=false; 071 private int maxPageSize=BaseDestination.MAX_PAGE_SIZE; 072 private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE; 073 private boolean useCache=true; 074 private long minimumMessageSize=1024; 075 private boolean useConsumerPriority=true; 076 private boolean strictOrderDispatch=false; 077 private boolean lazyDispatch=false; 078 private int timeBeforeDispatchStarts = 0; 079 private int consumersBeforeDispatchStarts = 0; 080 private boolean advisoryForSlowConsumers; 081 private boolean advisoryForFastProducers; 082 private boolean advisoryForDiscardingMessages; 083 private boolean advisoryWhenFull; 084 private boolean advisoryForDelivery; 085 private boolean advisoryForConsumed; 086 private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD; 087 private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE; 088 private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; 089 private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH; 090 private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH; 091 private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH; 092 private boolean usePrefetchExtension = true; 093 private int cursorMemoryHighWaterMark = 70; 094 private int storeUsageHighWaterMark = 100; 095 private SlowConsumerStrategy slowConsumerStrategy; 096 private boolean prioritizedMessages; 097 private boolean allConsumersExclusiveByDefault; 098 private boolean gcInactiveDestinations; 099 private boolean gcWithNetworkConsumers; 100 private long inactiveTimeoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; 101 private boolean reduceMemoryFootprint; 102 private NetworkBridgeFilterFactory networkBridgeFilterFactory; 103 private boolean doOptimzeMessageStorage = true; 104 /* 105 * percentage of in-flight messages above which optimize message store is disabled 106 */ 107 private int optimizeMessageStoreInFlightLimit = 10; 108 private boolean persistJMSRedelivered = false; 109 private int sendFailIfNoSpace = -1; 110 private long sendFailIfNoSpaceAfterTimeout = -1; 111 112 113 public void configure(Broker broker,Queue queue) { 114 baseConfiguration(broker,queue); 115 if (dispatchPolicy != null) { 116 queue.setDispatchPolicy(dispatchPolicy); 117 } 118 queue.setDeadLetterStrategy(getDeadLetterStrategy()); 119 queue.setMessageGroupMapFactory(getMessageGroupMapFactory()); 120 if (memoryLimit > 0) { 121 queue.getMemoryUsage().setLimit(memoryLimit); 122 } 123 if (pendingQueuePolicy != null) { 124 PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue); 125 queue.setMessages(messages); 126 } 127 128 queue.setUseConsumerPriority(isUseConsumerPriority()); 129 queue.setStrictOrderDispatch(isStrictOrderDispatch()); 130 queue.setOptimizedDispatch(isOptimizedDispatch()); 131 queue.setLazyDispatch(isLazyDispatch()); 132 queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts()); 133 queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); 134 queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault()); 135 queue.setPersistJMSRedelivered(isPersistJMSRedelivered()); 136 } 137 138 public void update(Queue queue) { 139 update(queue, null); 140 } 141 142 /** 143 * Update a queue with this policy. Only apply properties that 144 * match the includedProperties list. Not all properties are eligible 145 * to be updated. 146 * 147 * If includedProperties is null then all of the properties will be set as 148 * isUpdate will return true 149 * @param baseDestination 150 * @param includedProperties 151 */ 152 public void update(Queue queue, Set<String> includedProperties) { 153 baseUpdate(queue, includedProperties); 154 if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) { 155 queue.getMemoryUsage().setLimit(memoryLimit); 156 } 157 if (isUpdate("useConsumerPriority", includedProperties)) { 158 queue.setUseConsumerPriority(isUseConsumerPriority()); 159 } 160 if (isUpdate("strictOrderDispatch", includedProperties)) { 161 queue.setStrictOrderDispatch(isStrictOrderDispatch()); 162 } 163 if (isUpdate("optimizedDispatch", includedProperties)) { 164 queue.setOptimizedDispatch(isOptimizedDispatch()); 165 } 166 if (isUpdate("lazyDispatch", includedProperties)) { 167 queue.setLazyDispatch(isLazyDispatch()); 168 } 169 if (isUpdate("timeBeforeDispatchStarts", includedProperties)) { 170 queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts()); 171 } 172 if (isUpdate("consumersBeforeDispatchStarts", includedProperties)) { 173 queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); 174 } 175 if (isUpdate("allConsumersExclusiveByDefault", includedProperties)) { 176 queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault()); 177 } 178 if (isUpdate("persistJMSRedelivered", includedProperties)) { 179 queue.setPersistJMSRedelivered(isPersistJMSRedelivered()); 180 } 181 } 182 183 public void configure(Broker broker,Topic topic) { 184 baseConfiguration(broker,topic); 185 if (dispatchPolicy != null) { 186 topic.setDispatchPolicy(dispatchPolicy); 187 } 188 topic.setDeadLetterStrategy(getDeadLetterStrategy()); 189 if (subscriptionRecoveryPolicy != null) { 190 SubscriptionRecoveryPolicy srp = subscriptionRecoveryPolicy.copy(); 191 srp.setBroker(broker); 192 topic.setSubscriptionRecoveryPolicy(srp); 193 } 194 if (memoryLimit > 0) { 195 topic.getMemoryUsage().setLimit(memoryLimit); 196 } 197 topic.setLazyDispatch(isLazyDispatch()); 198 } 199 200 public void update(Topic topic) { 201 update(topic, null); 202 } 203 204 //If includedProperties is null then all of the properties will be set as 205 //isUpdate will return true 206 public void update(Topic topic, Set<String> includedProperties) { 207 baseUpdate(topic, includedProperties); 208 if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) { 209 topic.getMemoryUsage().setLimit(memoryLimit); 210 } 211 if (isUpdate("lazyDispatch", includedProperties)) { 212 topic.setLazyDispatch(isLazyDispatch()); 213 } 214 } 215 216 // attributes that can change on the fly 217 public void baseUpdate(BaseDestination destination) { 218 baseUpdate(destination, null); 219 } 220 221 // attributes that can change on the fly 222 //If includedProperties is null then all of the properties will be set as 223 //isUpdate will return true 224 public void baseUpdate(BaseDestination destination, Set<String> includedProperties) { 225 if (isUpdate("producerFlowControl", includedProperties)) { 226 destination.setProducerFlowControl(isProducerFlowControl()); 227 } 228 if (isUpdate("alwaysRetroactive", includedProperties)) { 229 destination.setAlwaysRetroactive(isAlwaysRetroactive()); 230 } 231 if (isUpdate("blockedProducerWarningInterval", includedProperties)) { 232 destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval()); 233 } 234 if (isUpdate("maxPageSize", includedProperties)) { 235 destination.setMaxPageSize(getMaxPageSize()); 236 } 237 if (isUpdate("maxBrowsePageSize", includedProperties)) { 238 destination.setMaxBrowsePageSize(getMaxBrowsePageSize()); 239 } 240 241 if (isUpdate("minimumMessageSize", includedProperties)) { 242 destination.setMinimumMessageSize((int) getMinimumMessageSize()); 243 } 244 if (isUpdate("maxExpirePageSize", includedProperties)) { 245 destination.setMaxExpirePageSize(getMaxExpirePageSize()); 246 } 247 if (isUpdate("cursorMemoryHighWaterMark", includedProperties)) { 248 destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 249 } 250 if (isUpdate("storeUsageHighWaterMark", includedProperties)) { 251 destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark()); 252 } 253 if (isUpdate("gcInactiveDestinations", includedProperties)) { 254 destination.setGcIfInactive(isGcInactiveDestinations()); 255 } 256 if (isUpdate("gcWithNetworkConsumers", includedProperties)) { 257 destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers()); 258 } 259 if (isUpdate("inactiveTimeoutBeforeGc", includedProperties)) { 260 destination.setInactiveTimeoutBeforeGC(getInactiveTimeoutBeforeGC()); 261 } 262 if (isUpdate("reduceMemoryFootprint", includedProperties)) { 263 destination.setReduceMemoryFootprint(isReduceMemoryFootprint()); 264 } 265 if (isUpdate("doOptimizeMessageStore", includedProperties)) { 266 destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage()); 267 } 268 if (isUpdate("optimizeMessageStoreInFlightLimit", includedProperties)) { 269 destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit()); 270 } 271 if (isUpdate("advisoryForConsumed", includedProperties)) { 272 destination.setAdvisoryForConsumed(isAdvisoryForConsumed()); 273 } 274 if (isUpdate("advisoryForDelivery", includedProperties)) { 275 destination.setAdvisoryForDelivery(isAdvisoryForDelivery()); 276 } 277 if (isUpdate("advisoryForDiscardingMessages", includedProperties)) { 278 destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages()); 279 } 280 if (isUpdate("advisoryForSlowConsumers", includedProperties)) { 281 destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers()); 282 } 283 if (isUpdate("advisoryForFastProducers", includedProperties)) { 284 destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers()); 285 } 286 if (isUpdate("advisoryWhenFull", includedProperties)) { 287 destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); 288 } 289 if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) { 290 destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers()); 291 } 292 } 293 294 public void baseConfiguration(Broker broker, BaseDestination destination) { 295 baseUpdate(destination); 296 destination.setEnableAudit(isEnableAudit()); 297 destination.setMaxAuditDepth(getMaxQueueAuditDepth()); 298 destination.setMaxProducersToAudit(getMaxProducersToAudit()); 299 destination.setUseCache(isUseCache()); 300 destination.setExpireMessagesPeriod(getExpireMessagesPeriod()); 301 SlowConsumerStrategy scs = getSlowConsumerStrategy(); 302 if (scs != null) { 303 scs.setBrokerService(broker); 304 scs.addDestination(destination); 305 } 306 destination.setSlowConsumerStrategy(scs); 307 destination.setPrioritizedMessages(isPrioritizedMessages()); 308 if (sendFailIfNoSpace != -1) { 309 destination.getSystemUsage().setSendFailIfNoSpace(isSendFailIfNoSpace()); 310 } 311 if (sendFailIfNoSpaceAfterTimeout != -1) { 312 destination.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(getSendFailIfNoSpaceAfterTimeout()); 313 } 314 } 315 316 public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { 317 configurePrefetch(subscription); 318 subscription.setUsePrefetchExtension(isUsePrefetchExtension()); 319 subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 320 if (pendingMessageLimitStrategy != null) { 321 int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription); 322 int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit(); 323 if (consumerLimit > 0) { 324 if (value < 0 || consumerLimit < value) { 325 value = consumerLimit; 326 } 327 } 328 if (value >= 0) { 329 LOG.debug("Setting the maximumPendingMessages size to: {} for consumer: {}", value, subscription.getInfo().getConsumerId()); 330 subscription.setMaximumPendingMessages(value); 331 } 332 } 333 if (messageEvictionStrategy != null) { 334 subscription.setMessageEvictionStrategy(messageEvictionStrategy); 335 } 336 if (pendingSubscriberPolicy != null) { 337 String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId(); 338 int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize(); 339 subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize,subscription)); 340 } 341 if (enableAudit) { 342 subscription.setEnableAudit(enableAudit); 343 subscription.setMaxProducersToAudit(maxProducersToAudit); 344 subscription.setMaxAuditDepth(maxAuditDepth); 345 } 346 } 347 348 public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) { 349 String clientId = sub.getSubscriptionKey().getClientId(); 350 String subName = sub.getSubscriptionKey().getSubscriptionName(); 351 sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 352 configurePrefetch(sub); 353 if (pendingDurableSubscriberPolicy != null) { 354 PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,sub.getPrefetchSize(),sub); 355 cursor.setSystemUsage(memoryManager); 356 sub.setPending(cursor); 357 } 358 int auditDepth = getMaxAuditDepth(); 359 if (auditDepth == BaseDestination.MAX_AUDIT_DEPTH && this.isPrioritizedMessages()) { 360 sub.setMaxAuditDepth(auditDepth * 10); 361 } else { 362 sub.setMaxAuditDepth(auditDepth); 363 } 364 sub.setMaxProducersToAudit(getMaxProducersToAudit()); 365 sub.setUsePrefetchExtension(isUsePrefetchExtension()); 366 } 367 368 public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) { 369 configurePrefetch(sub); 370 sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 371 sub.setUsePrefetchExtension(isUsePrefetchExtension()); 372 373 // TODO 374 // We currently need an infinite audit because of the way that browser dispatch 375 // is done. We should refactor the browsers to better handle message dispatch so 376 // we can remove this and perform a more efficient dispatch. 377 sub.setMaxProducersToAudit(Integer.MAX_VALUE); 378 sub.setMaxAuditDepth(Short.MAX_VALUE); 379 380 // part solution - dispatching to browsers needs to be restricted 381 sub.setMaxMessages(getMaxBrowsePageSize()); 382 } 383 384 public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) { 385 configurePrefetch(sub); 386 sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 387 sub.setUsePrefetchExtension(isUsePrefetchExtension()); 388 sub.setMaxProducersToAudit(getMaxProducersToAudit()); 389 } 390 391 public void configurePrefetch(Subscription subscription) { 392 393 final int currentPrefetch = subscription.getConsumerInfo().getPrefetchSize(); 394 if (subscription instanceof QueueBrowserSubscription) { 395 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH) { 396 ((QueueBrowserSubscription) subscription).setPrefetchSize(getQueueBrowserPrefetch()); 397 } 398 } else if (subscription instanceof QueueSubscription) { 399 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH) { 400 ((QueueSubscription) subscription).setPrefetchSize(getQueuePrefetch()); 401 } 402 } else if (subscription instanceof DurableTopicSubscription) { 403 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH || 404 subscription.getConsumerInfo().getPrefetchSize() == ActiveMQPrefetchPolicy.DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH) { 405 ((DurableTopicSubscription)subscription).setPrefetchSize(getDurableTopicPrefetch()); 406 } 407 } else if (subscription instanceof TopicSubscription) { 408 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH) { 409 ((TopicSubscription) subscription).setPrefetchSize(getTopicPrefetch()); 410 } 411 } 412 if (currentPrefetch != 0 && subscription.getPrefetchSize() == 0) { 413 // tell the sub so that it can issue a pull request 414 subscription.updateConsumerPrefetch(0); 415 } 416 } 417 418 private boolean isUpdate(String property, Set<String> includedProperties) { 419 return includedProperties == null || includedProperties.contains(property); 420 } 421 // Properties 422 // ------------------------------------------------------------------------- 423 public DispatchPolicy getDispatchPolicy() { 424 return dispatchPolicy; 425 } 426 427 public void setDispatchPolicy(DispatchPolicy policy) { 428 this.dispatchPolicy = policy; 429 } 430 431 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 432 return subscriptionRecoveryPolicy; 433 } 434 435 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) { 436 this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy; 437 } 438 439 public boolean isSendAdvisoryIfNoConsumers() { 440 return sendAdvisoryIfNoConsumers; 441 } 442 443 /** 444 * Sends an advisory message if a non-persistent message is sent and there 445 * are no active consumers 446 */ 447 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { 448 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; 449 } 450 451 public DeadLetterStrategy getDeadLetterStrategy() { 452 return deadLetterStrategy; 453 } 454 455 /** 456 * Sets the policy used to determine which dead letter queue destination 457 * should be used 458 */ 459 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { 460 this.deadLetterStrategy = deadLetterStrategy; 461 } 462 463 public PendingMessageLimitStrategy getPendingMessageLimitStrategy() { 464 return pendingMessageLimitStrategy; 465 } 466 467 /** 468 * Sets the strategy to calculate the maximum number of messages that are 469 * allowed to be pending on consumers (in addition to their prefetch sizes). 470 * Once the limit is reached, non-durable topics can then start discarding 471 * old messages. This allows us to keep dispatching messages to slow 472 * consumers while not blocking fast consumers and discarding the messages 473 * oldest first. 474 */ 475 public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy) { 476 this.pendingMessageLimitStrategy = pendingMessageLimitStrategy; 477 } 478 479 public MessageEvictionStrategy getMessageEvictionStrategy() { 480 return messageEvictionStrategy; 481 } 482 483 /** 484 * Sets the eviction strategy used to decide which message to evict when the 485 * slow consumer needs to discard messages 486 */ 487 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) { 488 this.messageEvictionStrategy = messageEvictionStrategy; 489 } 490 491 public long getMemoryLimit() { 492 return memoryLimit; 493 } 494 495 /** 496 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 497 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 498 */ 499 public void setMemoryLimit(long memoryLimit) { 500 this.memoryLimit = memoryLimit; 501 } 502 503 public MessageGroupMapFactory getMessageGroupMapFactory() { 504 if (messageGroupMapFactory == null) { 505 try { 506 messageGroupMapFactory = GroupFactoryFinder.createMessageGroupMapFactory(getMessageGroupMapFactoryType()); 507 }catch(Exception e){ 508 LOG.error("Failed to create message group Factory ",e); 509 } 510 } 511 return messageGroupMapFactory; 512 } 513 514 /** 515 * Sets the factory used to create new instances of {MessageGroupMap} used 516 * to implement the <a 517 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 518 * functionality. 519 */ 520 public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) { 521 this.messageGroupMapFactory = messageGroupMapFactory; 522 } 523 524 525 public String getMessageGroupMapFactoryType() { 526 return messageGroupMapFactoryType; 527 } 528 529 public void setMessageGroupMapFactoryType(String messageGroupMapFactoryType) { 530 this.messageGroupMapFactoryType = messageGroupMapFactoryType; 531 } 532 533 534 /** 535 * @return the pendingDurableSubscriberPolicy 536 */ 537 public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() { 538 return this.pendingDurableSubscriberPolicy; 539 } 540 541 /** 542 * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy 543 * to set 544 */ 545 public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) { 546 this.pendingDurableSubscriberPolicy = pendingDurableSubscriberPolicy; 547 } 548 549 /** 550 * @return the pendingQueuePolicy 551 */ 552 public PendingQueueMessageStoragePolicy getPendingQueuePolicy() { 553 return this.pendingQueuePolicy; 554 } 555 556 /** 557 * @param pendingQueuePolicy the pendingQueuePolicy to set 558 */ 559 public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy pendingQueuePolicy) { 560 this.pendingQueuePolicy = pendingQueuePolicy; 561 } 562 563 /** 564 * @return the pendingSubscriberPolicy 565 */ 566 public PendingSubscriberMessageStoragePolicy getPendingSubscriberPolicy() { 567 return this.pendingSubscriberPolicy; 568 } 569 570 /** 571 * @param pendingSubscriberPolicy the pendingSubscriberPolicy to set 572 */ 573 public void setPendingSubscriberPolicy(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) { 574 this.pendingSubscriberPolicy = pendingSubscriberPolicy; 575 } 576 577 /** 578 * @return true if producer flow control enabled 579 */ 580 public boolean isProducerFlowControl() { 581 return producerFlowControl; 582 } 583 584 /** 585 * @param producerFlowControl 586 */ 587 public void setProducerFlowControl(boolean producerFlowControl) { 588 this.producerFlowControl = producerFlowControl; 589 } 590 591 /** 592 * @return true if topic is always retroactive 593 */ 594 public boolean isAlwaysRetroactive() { 595 return alwaysRetroactive; 596 } 597 598 /** 599 * @param alwaysRetroactive 600 */ 601 public void setAlwaysRetroactive(boolean alwaysRetroactive) { 602 this.alwaysRetroactive = alwaysRetroactive; 603 } 604 605 606 /** 607 * Set's the interval at which warnings about producers being blocked by 608 * resource usage will be triggered. Values of 0 or less will disable 609 * warnings 610 * 611 * @param blockedProducerWarningInterval the interval at which warning about 612 * blocked producers will be triggered. 613 */ 614 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 615 this.blockedProducerWarningInterval = blockedProducerWarningInterval; 616 } 617 618 /** 619 * 620 * @return the interval at which warning about blocked producers will be 621 * triggered. 622 */ 623 public long getBlockedProducerWarningInterval() { 624 return blockedProducerWarningInterval; 625 } 626 627 /** 628 * @return the maxProducersToAudit 629 */ 630 public int getMaxProducersToAudit() { 631 return maxProducersToAudit; 632 } 633 634 /** 635 * @param maxProducersToAudit the maxProducersToAudit to set 636 */ 637 public void setMaxProducersToAudit(int maxProducersToAudit) { 638 this.maxProducersToAudit = maxProducersToAudit; 639 } 640 641 /** 642 * @return the maxAuditDepth 643 */ 644 public int getMaxAuditDepth() { 645 return maxAuditDepth; 646 } 647 648 /** 649 * @param maxAuditDepth the maxAuditDepth to set 650 */ 651 public void setMaxAuditDepth(int maxAuditDepth) { 652 this.maxAuditDepth = maxAuditDepth; 653 } 654 655 /** 656 * @return the enableAudit 657 */ 658 public boolean isEnableAudit() { 659 return enableAudit; 660 } 661 662 /** 663 * @param enableAudit the enableAudit to set 664 */ 665 public void setEnableAudit(boolean enableAudit) { 666 this.enableAudit = enableAudit; 667 } 668 669 public int getMaxQueueAuditDepth() { 670 return maxQueueAuditDepth; 671 } 672 673 public void setMaxQueueAuditDepth(int maxQueueAuditDepth) { 674 this.maxQueueAuditDepth = maxQueueAuditDepth; 675 } 676 677 public boolean isOptimizedDispatch() { 678 return optimizedDispatch; 679 } 680 681 public void setOptimizedDispatch(boolean optimizedDispatch) { 682 this.optimizedDispatch = optimizedDispatch; 683 } 684 685 public int getMaxPageSize() { 686 return maxPageSize; 687 } 688 689 public void setMaxPageSize(int maxPageSize) { 690 this.maxPageSize = maxPageSize; 691 } 692 693 public int getMaxBrowsePageSize() { 694 return maxBrowsePageSize; 695 } 696 697 public void setMaxBrowsePageSize(int maxPageSize) { 698 this.maxBrowsePageSize = maxPageSize; 699 } 700 701 public boolean isUseCache() { 702 return useCache; 703 } 704 705 public void setUseCache(boolean useCache) { 706 this.useCache = useCache; 707 } 708 709 public long getMinimumMessageSize() { 710 return minimumMessageSize; 711 } 712 713 public void setMinimumMessageSize(long minimumMessageSize) { 714 this.minimumMessageSize = minimumMessageSize; 715 } 716 717 public boolean isUseConsumerPriority() { 718 return useConsumerPriority; 719 } 720 721 public void setUseConsumerPriority(boolean useConsumerPriority) { 722 this.useConsumerPriority = useConsumerPriority; 723 } 724 725 public boolean isStrictOrderDispatch() { 726 return strictOrderDispatch; 727 } 728 729 public void setStrictOrderDispatch(boolean strictOrderDispatch) { 730 this.strictOrderDispatch = strictOrderDispatch; 731 } 732 733 public boolean isLazyDispatch() { 734 return lazyDispatch; 735 } 736 737 public void setLazyDispatch(boolean lazyDispatch) { 738 this.lazyDispatch = lazyDispatch; 739 } 740 741 public int getTimeBeforeDispatchStarts() { 742 return timeBeforeDispatchStarts; 743 } 744 745 public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) { 746 this.timeBeforeDispatchStarts = timeBeforeDispatchStarts; 747 } 748 749 public int getConsumersBeforeDispatchStarts() { 750 return consumersBeforeDispatchStarts; 751 } 752 753 public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) { 754 this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts; 755 } 756 757 /** 758 * @return the advisoryForSlowConsumers 759 */ 760 public boolean isAdvisoryForSlowConsumers() { 761 return advisoryForSlowConsumers; 762 } 763 764 /** 765 * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set 766 */ 767 public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) { 768 this.advisoryForSlowConsumers = advisoryForSlowConsumers; 769 } 770 771 /** 772 * @return the advisoryForDiscardingMessages 773 */ 774 public boolean isAdvisoryForDiscardingMessages() { 775 return advisoryForDiscardingMessages; 776 } 777 778 /** 779 * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to set 780 */ 781 public void setAdvisoryForDiscardingMessages( 782 boolean advisoryForDiscardingMessages) { 783 this.advisoryForDiscardingMessages = advisoryForDiscardingMessages; 784 } 785 786 /** 787 * @return the advisoryWhenFull 788 */ 789 public boolean isAdvisoryWhenFull() { 790 return advisoryWhenFull; 791 } 792 793 /** 794 * @param advisoryWhenFull the advisoryWhenFull to set 795 */ 796 public void setAdvisoryWhenFull(boolean advisoryWhenFull) { 797 this.advisoryWhenFull = advisoryWhenFull; 798 } 799 800 /** 801 * @return the advisoryForDelivery 802 */ 803 public boolean isAdvisoryForDelivery() { 804 return advisoryForDelivery; 805 } 806 807 /** 808 * @param advisoryForDelivery the advisoryForDelivery to set 809 */ 810 public void setAdvisoryForDelivery(boolean advisoryForDelivery) { 811 this.advisoryForDelivery = advisoryForDelivery; 812 } 813 814 /** 815 * @return the advisoryForConsumed 816 */ 817 public boolean isAdvisoryForConsumed() { 818 return advisoryForConsumed; 819 } 820 821 /** 822 * @param advisoryForConsumed the advisoryForConsumed to set 823 */ 824 public void setAdvisoryForConsumed(boolean advisoryForConsumed) { 825 this.advisoryForConsumed = advisoryForConsumed; 826 } 827 828 /** 829 * @return the advisdoryForFastProducers 830 */ 831 public boolean isAdvisoryForFastProducers() { 832 return advisoryForFastProducers; 833 } 834 835 /** 836 * @param advisoryForFastProducers the advisdoryForFastProducers to set 837 */ 838 public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) { 839 this.advisoryForFastProducers = advisoryForFastProducers; 840 } 841 842 public void setMaxExpirePageSize(int maxExpirePageSize) { 843 this.maxExpirePageSize = maxExpirePageSize; 844 } 845 846 public int getMaxExpirePageSize() { 847 return maxExpirePageSize; 848 } 849 850 public void setExpireMessagesPeriod(long expireMessagesPeriod) { 851 this.expireMessagesPeriod = expireMessagesPeriod; 852 } 853 854 public long getExpireMessagesPeriod() { 855 return expireMessagesPeriod; 856 } 857 858 /** 859 * Get the queuePrefetch 860 * @return the queuePrefetch 861 */ 862 public int getQueuePrefetch() { 863 return this.queuePrefetch; 864 } 865 866 /** 867 * Set the queuePrefetch 868 * @param queuePrefetch the queuePrefetch to set 869 */ 870 public void setQueuePrefetch(int queuePrefetch) { 871 this.queuePrefetch = queuePrefetch; 872 } 873 874 /** 875 * Get the queueBrowserPrefetch 876 * @return the queueBrowserPrefetch 877 */ 878 public int getQueueBrowserPrefetch() { 879 return this.queueBrowserPrefetch; 880 } 881 882 /** 883 * Set the queueBrowserPrefetch 884 * @param queueBrowserPrefetch the queueBrowserPrefetch to set 885 */ 886 public void setQueueBrowserPrefetch(int queueBrowserPrefetch) { 887 this.queueBrowserPrefetch = queueBrowserPrefetch; 888 } 889 890 /** 891 * Get the topicPrefetch 892 * @return the topicPrefetch 893 */ 894 public int getTopicPrefetch() { 895 return this.topicPrefetch; 896 } 897 898 /** 899 * Set the topicPrefetch 900 * @param topicPrefetch the topicPrefetch to set 901 */ 902 public void setTopicPrefetch(int topicPrefetch) { 903 this.topicPrefetch = topicPrefetch; 904 } 905 906 /** 907 * Get the durableTopicPrefetch 908 * @return the durableTopicPrefetch 909 */ 910 public int getDurableTopicPrefetch() { 911 return this.durableTopicPrefetch; 912 } 913 914 /** 915 * Set the durableTopicPrefetch 916 * @param durableTopicPrefetch the durableTopicPrefetch to set 917 */ 918 public void setDurableTopicPrefetch(int durableTopicPrefetch) { 919 this.durableTopicPrefetch = durableTopicPrefetch; 920 } 921 922 public boolean isUsePrefetchExtension() { 923 return this.usePrefetchExtension; 924 } 925 926 public void setUsePrefetchExtension(boolean usePrefetchExtension) { 927 this.usePrefetchExtension = usePrefetchExtension; 928 } 929 930 public int getCursorMemoryHighWaterMark() { 931 return this.cursorMemoryHighWaterMark; 932 } 933 934 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { 935 this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; 936 } 937 938 public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { 939 this.storeUsageHighWaterMark = storeUsageHighWaterMark; 940 } 941 942 public int getStoreUsageHighWaterMark() { 943 return storeUsageHighWaterMark; 944 } 945 946 public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { 947 this.slowConsumerStrategy = slowConsumerStrategy; 948 } 949 950 public SlowConsumerStrategy getSlowConsumerStrategy() { 951 return this.slowConsumerStrategy; 952 } 953 954 955 public boolean isPrioritizedMessages() { 956 return this.prioritizedMessages; 957 } 958 959 public void setPrioritizedMessages(boolean prioritizedMessages) { 960 this.prioritizedMessages = prioritizedMessages; 961 } 962 963 public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) { 964 this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault; 965 } 966 967 public boolean isAllConsumersExclusiveByDefault() { 968 return allConsumersExclusiveByDefault; 969 } 970 971 public boolean isGcInactiveDestinations() { 972 return this.gcInactiveDestinations; 973 } 974 975 public void setGcInactiveDestinations(boolean gcInactiveDestinations) { 976 this.gcInactiveDestinations = gcInactiveDestinations; 977 } 978 979 /** 980 * @return the amount of time spent inactive before GC of the destination kicks in. 981 * 982 * @deprecated use getInactiveTimeoutBeforeGC instead. 983 */ 984 @Deprecated 985 public long getInactiveTimoutBeforeGC() { 986 return getInactiveTimeoutBeforeGC(); 987 } 988 989 /** 990 * Sets the amount of time a destination is inactive before it is marked for GC 991 * 992 * @param inactiveTimoutBeforeGC 993 * time in milliseconds to configure as the inactive timeout. 994 * 995 * @deprecated use getInactiveTimeoutBeforeGC instead. 996 */ 997 @Deprecated 998 public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) { 999 setInactiveTimeoutBeforeGC(inactiveTimoutBeforeGC); 1000 } 1001 1002 /** 1003 * @return the amount of time spent inactive before GC of the destination kicks in. 1004 */ 1005 public long getInactiveTimeoutBeforeGC() { 1006 return this.inactiveTimeoutBeforeGC; 1007 } 1008 1009 /** 1010 * Sets the amount of time a destination is inactive before it is marked for GC 1011 * 1012 * @param inactiveTimoutBeforeGC 1013 * time in milliseconds to configure as the inactive timeout. 1014 */ 1015 public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) { 1016 this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC; 1017 } 1018 1019 public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) { 1020 this.gcWithNetworkConsumers = gcWithNetworkConsumers; 1021 } 1022 1023 public boolean isGcWithNetworkConsumers() { 1024 return gcWithNetworkConsumers; 1025 } 1026 1027 public boolean isReduceMemoryFootprint() { 1028 return reduceMemoryFootprint; 1029 } 1030 1031 public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) { 1032 this.reduceMemoryFootprint = reduceMemoryFootprint; 1033 } 1034 1035 public void setNetworkBridgeFilterFactory(NetworkBridgeFilterFactory networkBridgeFilterFactory) { 1036 this.networkBridgeFilterFactory = networkBridgeFilterFactory; 1037 } 1038 1039 public NetworkBridgeFilterFactory getNetworkBridgeFilterFactory() { 1040 return networkBridgeFilterFactory; 1041 } 1042 1043 public boolean isDoOptimzeMessageStorage() { 1044 return doOptimzeMessageStorage; 1045 } 1046 1047 public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { 1048 this.doOptimzeMessageStorage = doOptimzeMessageStorage; 1049 } 1050 1051 public int getOptimizeMessageStoreInFlightLimit() { 1052 return optimizeMessageStoreInFlightLimit; 1053 } 1054 1055 public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) { 1056 this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; 1057 } 1058 1059 public void setPersistJMSRedelivered(boolean val) { 1060 this.persistJMSRedelivered = val; 1061 } 1062 1063 public boolean isPersistJMSRedelivered() { 1064 return persistJMSRedelivered; 1065 } 1066 1067 @Override 1068 public String toString() { 1069 return "PolicyEntry [" + destination + "]"; 1070 } 1071 1072 public void setSendFailIfNoSpace(boolean val) { 1073 if (val) { 1074 this.sendFailIfNoSpace = 1; 1075 } else { 1076 this.sendFailIfNoSpace = 0; 1077 } 1078 } 1079 1080 public boolean isSendFailIfNoSpace() { 1081 return sendFailIfNoSpace == 1; 1082 } 1083 1084 public void setSendFailIfNoSpaceAfterTimeout(long sendFailIfNoSpaceAfterTimeout) { 1085 this.sendFailIfNoSpaceAfterTimeout = sendFailIfNoSpaceAfterTimeout; 1086 } 1087 1088 public long getSendFailIfNoSpaceAfterTimeout() { 1089 return this.sendFailIfNoSpaceAfterTimeout; 1090 } 1091}