001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.List; 021import java.util.concurrent.atomic.AtomicBoolean; 022 023import javax.jms.ResourceAllocationException; 024 025import org.apache.activemq.advisory.AdvisorySupport; 026import org.apache.activemq.broker.Broker; 027import org.apache.activemq.broker.BrokerService; 028import org.apache.activemq.broker.ConnectionContext; 029import org.apache.activemq.broker.ProducerBrokerExchange; 030import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 031import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; 032import org.apache.activemq.command.ActiveMQDestination; 033import org.apache.activemq.command.ActiveMQTopic; 034import org.apache.activemq.command.Message; 035import org.apache.activemq.command.MessageAck; 036import org.apache.activemq.command.MessageDispatchNotification; 037import org.apache.activemq.command.ProducerInfo; 038import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 039import org.apache.activemq.security.SecurityContext; 040import org.apache.activemq.state.ProducerState; 041import org.apache.activemq.store.MessageStore; 042import org.apache.activemq.thread.Scheduler; 043import org.apache.activemq.usage.MemoryUsage; 044import org.apache.activemq.usage.SystemUsage; 045import org.apache.activemq.usage.Usage; 046import org.slf4j.Logger; 047 048/** 049 * 050 */ 051public abstract class BaseDestination implements Destination { 052 /** 053 * The maximum number of messages to page in to the destination from 054 * persistent storage 055 */ 056 public static final int MAX_PAGE_SIZE = 200; 057 public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2; 058 public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000; 059 public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000; 060 public static final int MAX_PRODUCERS_TO_AUDIT = 64; 061 public static final int MAX_AUDIT_DEPTH = 10000; 062 063 protected final AtomicBoolean started = new AtomicBoolean(); 064 protected final ActiveMQDestination destination; 065 protected final Broker broker; 066 protected final MessageStore store; 067 protected SystemUsage systemUsage; 068 protected MemoryUsage memoryUsage; 069 private boolean producerFlowControl = true; 070 private boolean alwaysRetroactive = false; 071 protected long lastBlockedProducerWarnTime = 0l; 072 protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; 073 074 private int maxProducersToAudit = 1024; 075 private int maxAuditDepth = 2048; 076 private boolean enableAudit = true; 077 private int maxPageSize = MAX_PAGE_SIZE; 078 private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE; 079 private boolean useCache = true; 080 private int minimumMessageSize = 1024; 081 private boolean lazyDispatch = false; 082 private boolean advisoryForSlowConsumers; 083 private boolean advisoryForFastProducers; 084 private boolean advisoryForDiscardingMessages; 085 private boolean advisoryWhenFull; 086 private boolean advisoryForDelivery; 087 private boolean advisoryForConsumed; 088 private boolean sendAdvisoryIfNoConsumers; 089 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 090 protected final BrokerService brokerService; 091 protected final Broker regionBroker; 092 protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY; 093 protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD; 094 private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE; 095 protected int cursorMemoryHighWaterMark = 70; 096 protected int storeUsageHighWaterMark = 100; 097 private SlowConsumerStrategy slowConsumerStrategy; 098 private boolean prioritizedMessages; 099 private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; 100 private boolean gcIfInactive; 101 private boolean gcWithNetworkConsumers; 102 private long lastActiveTime=0l; 103 private boolean reduceMemoryFootprint = false; 104 protected final Scheduler scheduler; 105 private boolean disposed = false; 106 private boolean doOptimzeMessageStorage = true; 107 /* 108 * percentage of in-flight messages above which optimize message store is disabled 109 */ 110 private int optimizeMessageStoreInFlightLimit = 10; 111 private boolean persistJMSRedelivered; 112 113 /** 114 * @param brokerService 115 * @param store 116 * @param destination 117 * @param parentStats 118 * @throws Exception 119 */ 120 public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception { 121 this.brokerService = brokerService; 122 this.broker = brokerService.getBroker(); 123 this.store = store; 124 this.destination = destination; 125 // let's copy the enabled property from the parent DestinationStatistics 126 this.destinationStatistics.setEnabled(parentStats.isEnabled()); 127 this.destinationStatistics.setParent(parentStats); 128 this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString()); 129 this.memoryUsage = this.systemUsage.getMemoryUsage(); 130 this.memoryUsage.setUsagePortion(1.0f); 131 this.regionBroker = brokerService.getRegionBroker(); 132 this.scheduler = brokerService.getBroker().getScheduler(); 133 } 134 135 /** 136 * initialize the destination 137 * 138 * @throws Exception 139 */ 140 public void initialize() throws Exception { 141 // Let the store know what usage manager we are using so that he can 142 // flush messages to disk when usage gets high. 143 if (store != null) { 144 store.setMemoryUsage(this.memoryUsage); 145 } 146 } 147 148 /** 149 * @return the producerFlowControl 150 */ 151 @Override 152 public boolean isProducerFlowControl() { 153 return producerFlowControl; 154 } 155 156 /** 157 * @param producerFlowControl the producerFlowControl to set 158 */ 159 @Override 160 public void setProducerFlowControl(boolean producerFlowControl) { 161 this.producerFlowControl = producerFlowControl; 162 } 163 164 @Override 165 public boolean isAlwaysRetroactive() { 166 return alwaysRetroactive; 167 } 168 169 @Override 170 public void setAlwaysRetroactive(boolean alwaysRetroactive) { 171 this.alwaysRetroactive = alwaysRetroactive; 172 } 173 174 /** 175 * Set's the interval at which warnings about producers being blocked by 176 * resource usage will be triggered. Values of 0 or less will disable 177 * warnings 178 * 179 * @param blockedProducerWarningInterval the interval at which warning about 180 * blocked producers will be triggered. 181 */ 182 @Override 183 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 184 this.blockedProducerWarningInterval = blockedProducerWarningInterval; 185 } 186 187 /** 188 * 189 * @return the interval at which warning about blocked producers will be 190 * triggered. 191 */ 192 @Override 193 public long getBlockedProducerWarningInterval() { 194 return blockedProducerWarningInterval; 195 } 196 197 /** 198 * @return the maxProducersToAudit 199 */ 200 @Override 201 public int getMaxProducersToAudit() { 202 return maxProducersToAudit; 203 } 204 205 /** 206 * @param maxProducersToAudit the maxProducersToAudit to set 207 */ 208 @Override 209 public void setMaxProducersToAudit(int maxProducersToAudit) { 210 this.maxProducersToAudit = maxProducersToAudit; 211 } 212 213 /** 214 * @return the maxAuditDepth 215 */ 216 @Override 217 public int getMaxAuditDepth() { 218 return maxAuditDepth; 219 } 220 221 /** 222 * @param maxAuditDepth the maxAuditDepth to set 223 */ 224 @Override 225 public void setMaxAuditDepth(int maxAuditDepth) { 226 this.maxAuditDepth = maxAuditDepth; 227 } 228 229 /** 230 * @return the enableAudit 231 */ 232 @Override 233 public boolean isEnableAudit() { 234 return enableAudit; 235 } 236 237 /** 238 * @param enableAudit the enableAudit to set 239 */ 240 @Override 241 public void setEnableAudit(boolean enableAudit) { 242 this.enableAudit = enableAudit; 243 } 244 245 @Override 246 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 247 destinationStatistics.getProducers().increment(); 248 this.lastActiveTime=0l; 249 } 250 251 @Override 252 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 253 destinationStatistics.getProducers().decrement(); 254 } 255 256 @Override 257 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{ 258 destinationStatistics.getConsumers().increment(); 259 this.lastActiveTime=0l; 260 } 261 262 @Override 263 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{ 264 destinationStatistics.getConsumers().decrement(); 265 this.lastActiveTime=0l; 266 } 267 268 269 @Override 270 public final MemoryUsage getMemoryUsage() { 271 return memoryUsage; 272 } 273 274 @Override 275 public void setMemoryUsage(MemoryUsage memoryUsage) { 276 this.memoryUsage = memoryUsage; 277 } 278 279 @Override 280 public DestinationStatistics getDestinationStatistics() { 281 return destinationStatistics; 282 } 283 284 @Override 285 public ActiveMQDestination getActiveMQDestination() { 286 return destination; 287 } 288 289 @Override 290 public final String getName() { 291 return getActiveMQDestination().getPhysicalName(); 292 } 293 294 @Override 295 public final MessageStore getMessageStore() { 296 return store; 297 } 298 299 @Override 300 public boolean isActive() { 301 boolean isActive = destinationStatistics.getConsumers().getCount() > 0 || 302 destinationStatistics.getProducers().getCount() > 0; 303 if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() > 0) { 304 isActive = hasRegularConsumers(getConsumers()); 305 } 306 return isActive; 307 } 308 309 @Override 310 public int getMaxPageSize() { 311 return maxPageSize; 312 } 313 314 @Override 315 public void setMaxPageSize(int maxPageSize) { 316 this.maxPageSize = maxPageSize; 317 } 318 319 @Override 320 public int getMaxBrowsePageSize() { 321 return this.maxBrowsePageSize; 322 } 323 324 @Override 325 public void setMaxBrowsePageSize(int maxPageSize) { 326 this.maxBrowsePageSize = maxPageSize; 327 } 328 329 public int getMaxExpirePageSize() { 330 return this.maxExpirePageSize; 331 } 332 333 public void setMaxExpirePageSize(int maxPageSize) { 334 this.maxExpirePageSize = maxPageSize; 335 } 336 337 public void setExpireMessagesPeriod(long expireMessagesPeriod) { 338 this.expireMessagesPeriod = expireMessagesPeriod; 339 } 340 341 public long getExpireMessagesPeriod() { 342 return expireMessagesPeriod; 343 } 344 345 @Override 346 public boolean isUseCache() { 347 return useCache; 348 } 349 350 @Override 351 public void setUseCache(boolean useCache) { 352 this.useCache = useCache; 353 } 354 355 @Override 356 public int getMinimumMessageSize() { 357 return minimumMessageSize; 358 } 359 360 @Override 361 public void setMinimumMessageSize(int minimumMessageSize) { 362 this.minimumMessageSize = minimumMessageSize; 363 } 364 365 @Override 366 public boolean isLazyDispatch() { 367 return lazyDispatch; 368 } 369 370 @Override 371 public void setLazyDispatch(boolean lazyDispatch) { 372 this.lazyDispatch = lazyDispatch; 373 } 374 375 protected long getDestinationSequenceId() { 376 return regionBroker.getBrokerSequenceId(); 377 } 378 379 /** 380 * @return the advisoryForSlowConsumers 381 */ 382 public boolean isAdvisoryForSlowConsumers() { 383 return advisoryForSlowConsumers; 384 } 385 386 /** 387 * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set 388 */ 389 public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) { 390 this.advisoryForSlowConsumers = advisoryForSlowConsumers; 391 } 392 393 /** 394 * @return the advisoryForDiscardingMessages 395 */ 396 public boolean isAdvisoryForDiscardingMessages() { 397 return advisoryForDiscardingMessages; 398 } 399 400 /** 401 * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to 402 * set 403 */ 404 public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) { 405 this.advisoryForDiscardingMessages = advisoryForDiscardingMessages; 406 } 407 408 /** 409 * @return the advisoryWhenFull 410 */ 411 public boolean isAdvisoryWhenFull() { 412 return advisoryWhenFull; 413 } 414 415 /** 416 * @param advisoryWhenFull the advisoryWhenFull to set 417 */ 418 public void setAdvisoryWhenFull(boolean advisoryWhenFull) { 419 this.advisoryWhenFull = advisoryWhenFull; 420 } 421 422 /** 423 * @return the advisoryForDelivery 424 */ 425 public boolean isAdvisoryForDelivery() { 426 return advisoryForDelivery; 427 } 428 429 /** 430 * @param advisoryForDelivery the advisoryForDelivery to set 431 */ 432 public void setAdvisoryForDelivery(boolean advisoryForDelivery) { 433 this.advisoryForDelivery = advisoryForDelivery; 434 } 435 436 /** 437 * @return the advisoryForConsumed 438 */ 439 public boolean isAdvisoryForConsumed() { 440 return advisoryForConsumed; 441 } 442 443 /** 444 * @param advisoryForConsumed the advisoryForConsumed to set 445 */ 446 public void setAdvisoryForConsumed(boolean advisoryForConsumed) { 447 this.advisoryForConsumed = advisoryForConsumed; 448 } 449 450 /** 451 * @return the advisdoryForFastProducers 452 */ 453 public boolean isAdvisoryForFastProducers() { 454 return advisoryForFastProducers; 455 } 456 457 /** 458 * @param advisoryForFastProducers the advisdoryForFastProducers to set 459 */ 460 public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) { 461 this.advisoryForFastProducers = advisoryForFastProducers; 462 } 463 464 public boolean isSendAdvisoryIfNoConsumers() { 465 return sendAdvisoryIfNoConsumers; 466 } 467 468 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { 469 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; 470 } 471 472 /** 473 * @return the dead letter strategy 474 */ 475 @Override 476 public DeadLetterStrategy getDeadLetterStrategy() { 477 return deadLetterStrategy; 478 } 479 480 /** 481 * set the dead letter strategy 482 * 483 * @param deadLetterStrategy 484 */ 485 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { 486 this.deadLetterStrategy = deadLetterStrategy; 487 } 488 489 @Override 490 public int getCursorMemoryHighWaterMark() { 491 return this.cursorMemoryHighWaterMark; 492 } 493 494 @Override 495 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { 496 this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; 497 } 498 499 /** 500 * called when message is consumed 501 * 502 * @param context 503 * @param messageReference 504 */ 505 @Override 506 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 507 if (advisoryForConsumed) { 508 broker.messageConsumed(context, messageReference); 509 } 510 } 511 512 /** 513 * Called when message is delivered to the broker 514 * 515 * @param context 516 * @param messageReference 517 */ 518 @Override 519 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 520 this.lastActiveTime = 0L; 521 if (advisoryForDelivery) { 522 broker.messageDelivered(context, messageReference); 523 } 524 } 525 526 /** 527 * Called when a message is discarded - e.g. running low on memory This will 528 * happen only if the policy is enabled - e.g. non durable topics 529 * 530 * @param context 531 * @param messageReference 532 */ 533 @Override 534 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 535 if (advisoryForDiscardingMessages) { 536 broker.messageDiscarded(context, sub, messageReference); 537 } 538 } 539 540 /** 541 * Called when there is a slow consumer 542 * 543 * @param context 544 * @param subs 545 */ 546 @Override 547 public void slowConsumer(ConnectionContext context, Subscription subs) { 548 if (advisoryForSlowConsumers) { 549 broker.slowConsumer(context, this, subs); 550 } 551 if (slowConsumerStrategy != null) { 552 slowConsumerStrategy.slowConsumer(context, subs); 553 } 554 } 555 556 /** 557 * Called to notify a producer is too fast 558 * 559 * @param context 560 * @param producerInfo 561 */ 562 @Override 563 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { 564 if (advisoryForFastProducers) { 565 broker.fastProducer(context, producerInfo, getActiveMQDestination()); 566 } 567 } 568 569 /** 570 * Called when a Usage reaches a limit 571 * 572 * @param context 573 * @param usage 574 */ 575 @Override 576 public void isFull(ConnectionContext context, Usage<?> usage) { 577 if (advisoryWhenFull) { 578 broker.isFull(context, this, usage); 579 } 580 } 581 582 @Override 583 public void dispose(ConnectionContext context) throws IOException { 584 if (this.store != null) { 585 this.store.removeAllMessages(context); 586 this.store.dispose(context); 587 } 588 this.destinationStatistics.setParent(null); 589 this.memoryUsage.stop(); 590 this.disposed = true; 591 } 592 593 @Override 594 public boolean isDisposed() { 595 return this.disposed; 596 } 597 598 /** 599 * Provides a hook to allow messages with no consumer to be processed in 600 * some way - such as to send to a dead letter queue or something.. 601 */ 602 protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception { 603 if (!msg.isPersistent()) { 604 if (isSendAdvisoryIfNoConsumers()) { 605 // allow messages with no consumers to be dispatched to a dead 606 // letter queue 607 if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) { 608 609 Message message = msg.copy(); 610 // The original destination and transaction id do not get 611 // filled when the message is first sent, 612 // it is only populated if the message is routed to another 613 // destination like the DLQ 614 if (message.getOriginalDestination() != null) { 615 message.setOriginalDestination(message.getDestination()); 616 } 617 if (message.getOriginalTransactionId() != null) { 618 message.setOriginalTransactionId(message.getTransactionId()); 619 } 620 621 ActiveMQTopic advisoryTopic; 622 if (destination.isQueue()) { 623 advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination); 624 } else { 625 advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); 626 } 627 message.setDestination(advisoryTopic); 628 message.setTransactionId(null); 629 630 // Disable flow control for this since since we don't want 631 // to block. 632 boolean originalFlowControl = context.isProducerFlowControl(); 633 try { 634 context.setProducerFlowControl(false); 635 ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 636 producerExchange.setMutable(false); 637 producerExchange.setConnectionContext(context); 638 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 639 context.getBroker().send(producerExchange, message); 640 } finally { 641 context.setProducerFlowControl(originalFlowControl); 642 } 643 644 } 645 } 646 } 647 } 648 649 @Override 650 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 651 } 652 653 public final int getStoreUsageHighWaterMark() { 654 return this.storeUsageHighWaterMark; 655 } 656 657 public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { 658 this.storeUsageHighWaterMark = storeUsageHighWaterMark; 659 } 660 661 protected final void waitForSpace(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException { 662 waitForSpace(context, producerBrokerExchange, usage, 100, warning); 663 } 664 665 protected final void waitForSpace(ConnectionContext context, ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException { 666 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 667 if (isFlowControlLogRequired()) { 668 getLog().info("sendFailIfNoSpace, forcing exception on send, usage: {}: {}", usage, warning); 669 } else { 670 getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: {}: {}", usage, warning); 671 } 672 throw new ResourceAllocationException(warning); 673 } 674 if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { 675 if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) { 676 if (isFlowControlLogRequired()) { 677 getLog().info("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: {}: {}", usage, warning); 678 } else { 679 getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: {}: {}", usage, warning); 680 } 681 throw new ResourceAllocationException(warning); 682 } 683 } else { 684 long start = System.currentTimeMillis(); 685 producerBrokerExchange.blockingOnFlowControl(true); 686 destinationStatistics.getBlockedSends().increment(); 687 while (!usage.waitForSpace(1000, highWaterMark)) { 688 if (context.getStopping().get()) { 689 throw new IOException("Connection closed, send aborted."); 690 } 691 692 if (isFlowControlLogRequired()) { 693 getLog().warn("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))}); 694 } else { 695 getLog().debug("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))}); 696 } 697 } 698 long finish = System.currentTimeMillis(); 699 long totalTimeBlocked = finish - start; 700 destinationStatistics.getBlockedTime().addTime(totalTimeBlocked); 701 producerBrokerExchange.incrementTimeBlocked(this,totalTimeBlocked); 702 producerBrokerExchange.blockingOnFlowControl(false); 703 } 704 } 705 706 protected boolean isFlowControlLogRequired() { 707 boolean answer = false; 708 if (blockedProducerWarningInterval > 0) { 709 long now = System.currentTimeMillis(); 710 if (lastBlockedProducerWarnTime + blockedProducerWarningInterval <= now) { 711 lastBlockedProducerWarnTime = now; 712 answer = true; 713 } 714 } 715 return answer; 716 } 717 718 protected abstract Logger getLog(); 719 720 public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { 721 this.slowConsumerStrategy = slowConsumerStrategy; 722 } 723 724 @Override 725 public SlowConsumerStrategy getSlowConsumerStrategy() { 726 return this.slowConsumerStrategy; 727 } 728 729 730 @Override 731 public boolean isPrioritizedMessages() { 732 return this.prioritizedMessages; 733 } 734 735 public void setPrioritizedMessages(boolean prioritizedMessages) { 736 this.prioritizedMessages = prioritizedMessages; 737 if (store != null) { 738 store.setPrioritizedMessages(prioritizedMessages); 739 } 740 } 741 742 /** 743 * @return the inactiveTimeoutBeforeGC 744 */ 745 @Override 746 public long getInactiveTimeoutBeforeGC() { 747 return this.inactiveTimeoutBeforeGC; 748 } 749 750 /** 751 * @param inactiveTimeoutBeforeGC the inactiveTimeoutBeforeGC to set 752 */ 753 public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) { 754 this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC; 755 } 756 757 /** 758 * @return the gcIfInactive 759 */ 760 public boolean isGcIfInactive() { 761 return this.gcIfInactive; 762 } 763 764 /** 765 * @param gcIfInactive the gcIfInactive to set 766 */ 767 public void setGcIfInactive(boolean gcIfInactive) { 768 this.gcIfInactive = gcIfInactive; 769 } 770 771 /** 772 * Indicate if it is ok to gc destinations that have only network consumers 773 * @param gcWithNetworkConsumers 774 */ 775 public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) { 776 this.gcWithNetworkConsumers = gcWithNetworkConsumers; 777 } 778 779 public boolean isGcWithNetworkConsumers() { 780 return gcWithNetworkConsumers; 781 } 782 783 @Override 784 public void markForGC(long timeStamp) { 785 if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false 786 && destinationStatistics.messages.getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) { 787 this.lastActiveTime = timeStamp; 788 } 789 } 790 791 @Override 792 public boolean canGC() { 793 boolean result = false; 794 final long currentLastActiveTime = this.lastActiveTime; 795 if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.messages.getCount() == 0L ) { 796 if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) { 797 result = true; 798 } 799 } 800 return result; 801 } 802 803 public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) { 804 this.reduceMemoryFootprint = reduceMemoryFootprint; 805 } 806 807 protected boolean isReduceMemoryFootprint() { 808 return this.reduceMemoryFootprint; 809 } 810 811 @Override 812 public boolean isDoOptimzeMessageStorage() { 813 return doOptimzeMessageStorage; 814 } 815 816 @Override 817 public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { 818 this.doOptimzeMessageStorage = doOptimzeMessageStorage; 819 } 820 821 public int getOptimizeMessageStoreInFlightLimit() { 822 return optimizeMessageStoreInFlightLimit; 823 } 824 825 public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) { 826 this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; 827 } 828 829 830 @Override 831 public abstract List<Subscription> getConsumers(); 832 833 protected boolean hasRegularConsumers(List<Subscription> consumers) { 834 boolean hasRegularConsumers = false; 835 for (Subscription subscription: consumers) { 836 if (!subscription.getConsumerInfo().isNetworkSubscription()) { 837 hasRegularConsumers = true; 838 break; 839 } 840 } 841 return hasRegularConsumers; 842 } 843 844 public ConnectionContext createConnectionContext() { 845 ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext()); 846 answer.setBroker(this.broker); 847 answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); 848 answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 849 return answer; 850 } 851 852 protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) { 853 // the original ack may be a ranged ack, but we are trying to delete 854 // a specific 855 // message store here so we need to convert to a non ranged ack. 856 if (ack.getMessageCount() > 0) { 857 // Dup the ack 858 MessageAck a = new MessageAck(); 859 ack.copy(a); 860 ack = a; 861 // Convert to non-ranged. 862 ack.setMessageCount(1); 863 } 864 // always use node messageId so we can access entry/data Location 865 ack.setFirstMessageId(node.getMessageId()); 866 ack.setLastMessageId(node.getMessageId()); 867 return ack; 868 } 869 870 protected boolean isDLQ() { 871 return destination.isDLQ(); 872 } 873 874 @Override 875 public void duplicateFromStore(Message message, Subscription durableSub) { 876 ConnectionContext connectionContext = createConnectionContext(); 877 getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId()); 878 Throwable cause = new Throwable("duplicate from store for " + destination); 879 message.setRegionDestination(this); 880 broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause); 881 MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1); 882 messageAck.setPoisonCause(cause); 883 try { 884 acknowledge(connectionContext, durableSub, messageAck, message); 885 } catch (IOException e) { 886 getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), destination, messageAck); 887 } 888 } 889 890 public void setPersistJMSRedelivered(boolean persistJMSRedelivered) { 891 this.persistJMSRedelivered = persistJMSRedelivered; 892 } 893 894 public boolean isPersistJMSRedelivered() { 895 return persistJMSRedelivered; 896 } 897 898 public SystemUsage getSystemUsage() { 899 return systemUsage; 900 } 901}