001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.List; 021import java.util.concurrent.atomic.AtomicBoolean; 022 023import javax.jms.ResourceAllocationException; 024 025import org.apache.activemq.advisory.AdvisorySupport; 026import org.apache.activemq.broker.Broker; 027import org.apache.activemq.broker.BrokerService; 028import org.apache.activemq.broker.ConnectionContext; 029import org.apache.activemq.broker.ProducerBrokerExchange; 030import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 031import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; 032import org.apache.activemq.command.ActiveMQDestination; 033import org.apache.activemq.command.ActiveMQTopic; 034import org.apache.activemq.command.Message; 035import org.apache.activemq.command.MessageAck; 036import org.apache.activemq.command.MessageDispatchNotification; 037import org.apache.activemq.command.ProducerInfo; 038import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 039import org.apache.activemq.security.SecurityContext; 040import org.apache.activemq.state.ProducerState; 041import org.apache.activemq.store.MessageStore; 042import org.apache.activemq.thread.Scheduler; 043import org.apache.activemq.usage.MemoryUsage; 044import org.apache.activemq.usage.SystemUsage; 045import org.apache.activemq.usage.Usage; 046import org.slf4j.Logger; 047 048/** 049 * 050 */ 051public abstract class BaseDestination implements Destination { 052 /** 053 * The maximum number of messages to page in to the destination from 054 * persistent storage 055 */ 056 public static final int MAX_PAGE_SIZE = 200; 057 public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2; 058 public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000; 059 public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000; 060 public static final int MAX_PRODUCERS_TO_AUDIT = 64; 061 public static final int MAX_AUDIT_DEPTH = 10000; 062 public static final String DUPLICATE_FROM_STORE_MSG_PREFIX = "duplicate from store for "; 063 064 protected final AtomicBoolean started = new AtomicBoolean(); 065 protected final ActiveMQDestination destination; 066 protected final Broker broker; 067 protected final MessageStore store; 068 protected SystemUsage systemUsage; 069 protected MemoryUsage memoryUsage; 070 private boolean producerFlowControl = true; 071 private boolean alwaysRetroactive = false; 072 protected long lastBlockedProducerWarnTime = 0l; 073 protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; 074 075 private int maxProducersToAudit = 1024; 076 private int maxAuditDepth = 2048; 077 private boolean enableAudit = true; 078 private int maxPageSize = MAX_PAGE_SIZE; 079 private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE; 080 private boolean useCache = true; 081 private int minimumMessageSize = 1024; 082 private boolean lazyDispatch = false; 083 private boolean advisoryForSlowConsumers; 084 private boolean advisoryForFastProducers; 085 private boolean advisoryForDiscardingMessages; 086 private boolean advisoryWhenFull; 087 private boolean advisoryForDelivery; 088 private boolean advisoryForConsumed; 089 private boolean sendAdvisoryIfNoConsumers; 090 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 091 protected final BrokerService brokerService; 092 protected final Broker regionBroker; 093 protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY; 094 protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD; 095 private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE; 096 protected int cursorMemoryHighWaterMark = 70; 097 protected int storeUsageHighWaterMark = 100; 098 private SlowConsumerStrategy slowConsumerStrategy; 099 private boolean prioritizedMessages; 100 private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; 101 private boolean gcIfInactive; 102 private boolean gcWithNetworkConsumers; 103 private long lastActiveTime=0l; 104 private boolean reduceMemoryFootprint = false; 105 protected final Scheduler scheduler; 106 private boolean disposed = false; 107 private boolean doOptimzeMessageStorage = true; 108 /* 109 * percentage of in-flight messages above which optimize message store is disabled 110 */ 111 private int optimizeMessageStoreInFlightLimit = 10; 112 private boolean persistJMSRedelivered; 113 114 /** 115 * @param brokerService 116 * @param store 117 * @param destination 118 * @param parentStats 119 * @throws Exception 120 */ 121 public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception { 122 this.brokerService = brokerService; 123 this.broker = brokerService.getBroker(); 124 this.store = store; 125 this.destination = destination; 126 // let's copy the enabled property from the parent DestinationStatistics 127 this.destinationStatistics.setEnabled(parentStats.isEnabled()); 128 this.destinationStatistics.setParent(parentStats); 129 this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString()); 130 this.memoryUsage = this.systemUsage.getMemoryUsage(); 131 this.memoryUsage.setUsagePortion(1.0f); 132 this.regionBroker = brokerService.getRegionBroker(); 133 this.scheduler = brokerService.getBroker().getScheduler(); 134 } 135 136 /** 137 * initialize the destination 138 * 139 * @throws Exception 140 */ 141 public void initialize() throws Exception { 142 // Let the store know what usage manager we are using so that he can 143 // flush messages to disk when usage gets high. 144 if (store != null) { 145 store.setMemoryUsage(this.memoryUsage); 146 } 147 } 148 149 /** 150 * @return the producerFlowControl 151 */ 152 @Override 153 public boolean isProducerFlowControl() { 154 return producerFlowControl; 155 } 156 157 /** 158 * @param producerFlowControl the producerFlowControl to set 159 */ 160 @Override 161 public void setProducerFlowControl(boolean producerFlowControl) { 162 this.producerFlowControl = producerFlowControl; 163 } 164 165 @Override 166 public boolean isAlwaysRetroactive() { 167 return alwaysRetroactive; 168 } 169 170 @Override 171 public void setAlwaysRetroactive(boolean alwaysRetroactive) { 172 this.alwaysRetroactive = alwaysRetroactive; 173 } 174 175 /** 176 * Set's the interval at which warnings about producers being blocked by 177 * resource usage will be triggered. Values of 0 or less will disable 178 * warnings 179 * 180 * @param blockedProducerWarningInterval the interval at which warning about 181 * blocked producers will be triggered. 182 */ 183 @Override 184 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 185 this.blockedProducerWarningInterval = blockedProducerWarningInterval; 186 } 187 188 /** 189 * 190 * @return the interval at which warning about blocked producers will be 191 * triggered. 192 */ 193 @Override 194 public long getBlockedProducerWarningInterval() { 195 return blockedProducerWarningInterval; 196 } 197 198 /** 199 * @return the maxProducersToAudit 200 */ 201 @Override 202 public int getMaxProducersToAudit() { 203 return maxProducersToAudit; 204 } 205 206 /** 207 * @param maxProducersToAudit the maxProducersToAudit to set 208 */ 209 @Override 210 public void setMaxProducersToAudit(int maxProducersToAudit) { 211 this.maxProducersToAudit = maxProducersToAudit; 212 } 213 214 /** 215 * @return the maxAuditDepth 216 */ 217 @Override 218 public int getMaxAuditDepth() { 219 return maxAuditDepth; 220 } 221 222 /** 223 * @param maxAuditDepth the maxAuditDepth to set 224 */ 225 @Override 226 public void setMaxAuditDepth(int maxAuditDepth) { 227 this.maxAuditDepth = maxAuditDepth; 228 } 229 230 /** 231 * @return the enableAudit 232 */ 233 @Override 234 public boolean isEnableAudit() { 235 return enableAudit; 236 } 237 238 /** 239 * @param enableAudit the enableAudit to set 240 */ 241 @Override 242 public void setEnableAudit(boolean enableAudit) { 243 this.enableAudit = enableAudit; 244 } 245 246 @Override 247 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 248 destinationStatistics.getProducers().increment(); 249 this.lastActiveTime=0l; 250 } 251 252 @Override 253 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 254 destinationStatistics.getProducers().decrement(); 255 } 256 257 @Override 258 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{ 259 destinationStatistics.getConsumers().increment(); 260 this.lastActiveTime=0l; 261 } 262 263 @Override 264 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{ 265 destinationStatistics.getConsumers().decrement(); 266 this.lastActiveTime=0l; 267 } 268 269 270 @Override 271 public final MemoryUsage getMemoryUsage() { 272 return memoryUsage; 273 } 274 275 @Override 276 public void setMemoryUsage(MemoryUsage memoryUsage) { 277 this.memoryUsage = memoryUsage; 278 } 279 280 @Override 281 public DestinationStatistics getDestinationStatistics() { 282 return destinationStatistics; 283 } 284 285 @Override 286 public ActiveMQDestination getActiveMQDestination() { 287 return destination; 288 } 289 290 @Override 291 public final String getName() { 292 return getActiveMQDestination().getPhysicalName(); 293 } 294 295 @Override 296 public final MessageStore getMessageStore() { 297 return store; 298 } 299 300 @Override 301 public boolean isActive() { 302 boolean isActive = destinationStatistics.getConsumers().getCount() > 0 || 303 destinationStatistics.getProducers().getCount() > 0; 304 if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() > 0) { 305 isActive = hasRegularConsumers(getConsumers()); 306 } 307 return isActive; 308 } 309 310 @Override 311 public int getMaxPageSize() { 312 return maxPageSize; 313 } 314 315 @Override 316 public void setMaxPageSize(int maxPageSize) { 317 this.maxPageSize = maxPageSize; 318 } 319 320 @Override 321 public int getMaxBrowsePageSize() { 322 return this.maxBrowsePageSize; 323 } 324 325 @Override 326 public void setMaxBrowsePageSize(int maxPageSize) { 327 this.maxBrowsePageSize = maxPageSize; 328 } 329 330 public int getMaxExpirePageSize() { 331 return this.maxExpirePageSize; 332 } 333 334 public void setMaxExpirePageSize(int maxPageSize) { 335 this.maxExpirePageSize = maxPageSize; 336 } 337 338 public void setExpireMessagesPeriod(long expireMessagesPeriod) { 339 this.expireMessagesPeriod = expireMessagesPeriod; 340 } 341 342 public long getExpireMessagesPeriod() { 343 return expireMessagesPeriod; 344 } 345 346 @Override 347 public boolean isUseCache() { 348 return useCache; 349 } 350 351 @Override 352 public void setUseCache(boolean useCache) { 353 this.useCache = useCache; 354 } 355 356 @Override 357 public int getMinimumMessageSize() { 358 return minimumMessageSize; 359 } 360 361 @Override 362 public void setMinimumMessageSize(int minimumMessageSize) { 363 this.minimumMessageSize = minimumMessageSize; 364 } 365 366 @Override 367 public boolean isLazyDispatch() { 368 return lazyDispatch; 369 } 370 371 @Override 372 public void setLazyDispatch(boolean lazyDispatch) { 373 this.lazyDispatch = lazyDispatch; 374 } 375 376 protected long getDestinationSequenceId() { 377 return regionBroker.getBrokerSequenceId(); 378 } 379 380 /** 381 * @return the advisoryForSlowConsumers 382 */ 383 public boolean isAdvisoryForSlowConsumers() { 384 return advisoryForSlowConsumers; 385 } 386 387 /** 388 * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set 389 */ 390 public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) { 391 this.advisoryForSlowConsumers = advisoryForSlowConsumers; 392 } 393 394 /** 395 * @return the advisoryForDiscardingMessages 396 */ 397 public boolean isAdvisoryForDiscardingMessages() { 398 return advisoryForDiscardingMessages; 399 } 400 401 /** 402 * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to 403 * set 404 */ 405 public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) { 406 this.advisoryForDiscardingMessages = advisoryForDiscardingMessages; 407 } 408 409 /** 410 * @return the advisoryWhenFull 411 */ 412 public boolean isAdvisoryWhenFull() { 413 return advisoryWhenFull; 414 } 415 416 /** 417 * @param advisoryWhenFull the advisoryWhenFull to set 418 */ 419 public void setAdvisoryWhenFull(boolean advisoryWhenFull) { 420 this.advisoryWhenFull = advisoryWhenFull; 421 } 422 423 /** 424 * @return the advisoryForDelivery 425 */ 426 public boolean isAdvisoryForDelivery() { 427 return advisoryForDelivery; 428 } 429 430 /** 431 * @param advisoryForDelivery the advisoryForDelivery to set 432 */ 433 public void setAdvisoryForDelivery(boolean advisoryForDelivery) { 434 this.advisoryForDelivery = advisoryForDelivery; 435 } 436 437 /** 438 * @return the advisoryForConsumed 439 */ 440 public boolean isAdvisoryForConsumed() { 441 return advisoryForConsumed; 442 } 443 444 /** 445 * @param advisoryForConsumed the advisoryForConsumed to set 446 */ 447 public void setAdvisoryForConsumed(boolean advisoryForConsumed) { 448 this.advisoryForConsumed = advisoryForConsumed; 449 } 450 451 /** 452 * @return the advisdoryForFastProducers 453 */ 454 public boolean isAdvisoryForFastProducers() { 455 return advisoryForFastProducers; 456 } 457 458 /** 459 * @param advisoryForFastProducers the advisdoryForFastProducers to set 460 */ 461 public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) { 462 this.advisoryForFastProducers = advisoryForFastProducers; 463 } 464 465 public boolean isSendAdvisoryIfNoConsumers() { 466 return sendAdvisoryIfNoConsumers; 467 } 468 469 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { 470 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; 471 } 472 473 /** 474 * @return the dead letter strategy 475 */ 476 @Override 477 public DeadLetterStrategy getDeadLetterStrategy() { 478 return deadLetterStrategy; 479 } 480 481 /** 482 * set the dead letter strategy 483 * 484 * @param deadLetterStrategy 485 */ 486 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { 487 this.deadLetterStrategy = deadLetterStrategy; 488 } 489 490 @Override 491 public int getCursorMemoryHighWaterMark() { 492 return this.cursorMemoryHighWaterMark; 493 } 494 495 @Override 496 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { 497 this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; 498 } 499 500 /** 501 * called when message is consumed 502 * 503 * @param context 504 * @param messageReference 505 */ 506 @Override 507 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 508 if (advisoryForConsumed) { 509 broker.messageConsumed(context, messageReference); 510 } 511 } 512 513 /** 514 * Called when message is delivered to the broker 515 * 516 * @param context 517 * @param messageReference 518 */ 519 @Override 520 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 521 this.lastActiveTime = 0L; 522 if (advisoryForDelivery) { 523 broker.messageDelivered(context, messageReference); 524 } 525 } 526 527 /** 528 * Called when a message is discarded - e.g. running low on memory This will 529 * happen only if the policy is enabled - e.g. non durable topics 530 * 531 * @param context 532 * @param messageReference 533 */ 534 @Override 535 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 536 if (advisoryForDiscardingMessages) { 537 broker.messageDiscarded(context, sub, messageReference); 538 } 539 } 540 541 /** 542 * Called when there is a slow consumer 543 * 544 * @param context 545 * @param subs 546 */ 547 @Override 548 public void slowConsumer(ConnectionContext context, Subscription subs) { 549 if (advisoryForSlowConsumers) { 550 broker.slowConsumer(context, this, subs); 551 } 552 if (slowConsumerStrategy != null) { 553 slowConsumerStrategy.slowConsumer(context, subs); 554 } 555 } 556 557 /** 558 * Called to notify a producer is too fast 559 * 560 * @param context 561 * @param producerInfo 562 */ 563 @Override 564 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { 565 if (advisoryForFastProducers) { 566 broker.fastProducer(context, producerInfo, getActiveMQDestination()); 567 } 568 } 569 570 /** 571 * Called when a Usage reaches a limit 572 * 573 * @param context 574 * @param usage 575 */ 576 @Override 577 public void isFull(ConnectionContext context, Usage<?> usage) { 578 if (advisoryWhenFull) { 579 broker.isFull(context, this, usage); 580 } 581 } 582 583 @Override 584 public void dispose(ConnectionContext context) throws IOException { 585 if (this.store != null) { 586 this.store.removeAllMessages(context); 587 this.store.dispose(context); 588 } 589 this.destinationStatistics.setParent(null); 590 this.memoryUsage.stop(); 591 this.disposed = true; 592 } 593 594 @Override 595 public boolean isDisposed() { 596 return this.disposed; 597 } 598 599 /** 600 * Provides a hook to allow messages with no consumer to be processed in 601 * some way - such as to send to a dead letter queue or something.. 602 */ 603 protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception { 604 if (!msg.isPersistent()) { 605 if (isSendAdvisoryIfNoConsumers()) { 606 // allow messages with no consumers to be dispatched to a dead 607 // letter queue 608 if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) { 609 610 Message message = msg.copy(); 611 // The original destination and transaction id do not get 612 // filled when the message is first sent, 613 // it is only populated if the message is routed to another 614 // destination like the DLQ 615 if (message.getOriginalDestination() != null) { 616 message.setOriginalDestination(message.getDestination()); 617 } 618 if (message.getOriginalTransactionId() != null) { 619 message.setOriginalTransactionId(message.getTransactionId()); 620 } 621 622 ActiveMQTopic advisoryTopic; 623 if (destination.isQueue()) { 624 advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination); 625 } else { 626 advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); 627 } 628 message.setDestination(advisoryTopic); 629 message.setTransactionId(null); 630 631 // Disable flow control for this since since we don't want 632 // to block. 633 boolean originalFlowControl = context.isProducerFlowControl(); 634 try { 635 context.setProducerFlowControl(false); 636 ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 637 producerExchange.setMutable(false); 638 producerExchange.setConnectionContext(context); 639 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 640 context.getBroker().send(producerExchange, message); 641 } finally { 642 context.setProducerFlowControl(originalFlowControl); 643 } 644 645 } 646 } 647 } 648 } 649 650 @Override 651 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 652 } 653 654 public final int getStoreUsageHighWaterMark() { 655 return this.storeUsageHighWaterMark; 656 } 657 658 public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { 659 this.storeUsageHighWaterMark = storeUsageHighWaterMark; 660 } 661 662 protected final void waitForSpace(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException { 663 waitForSpace(context, producerBrokerExchange, usage, 100, warning); 664 } 665 666 protected final void waitForSpace(ConnectionContext context, ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException { 667 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 668 if (isFlowControlLogRequired()) { 669 getLog().info("sendFailIfNoSpace, forcing exception on send, usage: {}: {}", usage, warning); 670 } else { 671 getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: {}: {}", usage, warning); 672 } 673 throw new ResourceAllocationException(warning); 674 } 675 if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { 676 if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) { 677 if (isFlowControlLogRequired()) { 678 getLog().info("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: {}: {}", usage, warning); 679 } else { 680 getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: {}: {}", usage, warning); 681 } 682 throw new ResourceAllocationException(warning); 683 } 684 } else { 685 long start = System.currentTimeMillis(); 686 producerBrokerExchange.blockingOnFlowControl(true); 687 destinationStatistics.getBlockedSends().increment(); 688 while (!usage.waitForSpace(1000, highWaterMark)) { 689 if (context.getStopping().get()) { 690 throw new IOException("Connection closed, send aborted."); 691 } 692 693 if (isFlowControlLogRequired()) { 694 getLog().warn("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))}); 695 } else { 696 getLog().debug("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))}); 697 } 698 } 699 long finish = System.currentTimeMillis(); 700 long totalTimeBlocked = finish - start; 701 destinationStatistics.getBlockedTime().addTime(totalTimeBlocked); 702 producerBrokerExchange.incrementTimeBlocked(this,totalTimeBlocked); 703 producerBrokerExchange.blockingOnFlowControl(false); 704 } 705 } 706 707 protected boolean isFlowControlLogRequired() { 708 boolean answer = false; 709 if (blockedProducerWarningInterval > 0) { 710 long now = System.currentTimeMillis(); 711 if (lastBlockedProducerWarnTime + blockedProducerWarningInterval <= now) { 712 lastBlockedProducerWarnTime = now; 713 answer = true; 714 } 715 } 716 return answer; 717 } 718 719 protected abstract Logger getLog(); 720 721 public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { 722 this.slowConsumerStrategy = slowConsumerStrategy; 723 } 724 725 @Override 726 public SlowConsumerStrategy getSlowConsumerStrategy() { 727 return this.slowConsumerStrategy; 728 } 729 730 731 @Override 732 public boolean isPrioritizedMessages() { 733 return this.prioritizedMessages; 734 } 735 736 public void setPrioritizedMessages(boolean prioritizedMessages) { 737 this.prioritizedMessages = prioritizedMessages; 738 if (store != null) { 739 store.setPrioritizedMessages(prioritizedMessages); 740 } 741 } 742 743 /** 744 * @return the inactiveTimeoutBeforeGC 745 */ 746 @Override 747 public long getInactiveTimeoutBeforeGC() { 748 return this.inactiveTimeoutBeforeGC; 749 } 750 751 /** 752 * @param inactiveTimeoutBeforeGC the inactiveTimeoutBeforeGC to set 753 */ 754 public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) { 755 this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC; 756 } 757 758 /** 759 * @return the gcIfInactive 760 */ 761 public boolean isGcIfInactive() { 762 return this.gcIfInactive; 763 } 764 765 /** 766 * @param gcIfInactive the gcIfInactive to set 767 */ 768 public void setGcIfInactive(boolean gcIfInactive) { 769 this.gcIfInactive = gcIfInactive; 770 } 771 772 /** 773 * Indicate if it is ok to gc destinations that have only network consumers 774 * @param gcWithNetworkConsumers 775 */ 776 public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) { 777 this.gcWithNetworkConsumers = gcWithNetworkConsumers; 778 } 779 780 public boolean isGcWithNetworkConsumers() { 781 return gcWithNetworkConsumers; 782 } 783 784 @Override 785 public void markForGC(long timeStamp) { 786 if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false 787 && destinationStatistics.messages.getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) { 788 this.lastActiveTime = timeStamp; 789 } 790 } 791 792 @Override 793 public boolean canGC() { 794 boolean result = false; 795 final long currentLastActiveTime = this.lastActiveTime; 796 if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.messages.getCount() == 0L ) { 797 if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) { 798 result = true; 799 } 800 } 801 return result; 802 } 803 804 public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) { 805 this.reduceMemoryFootprint = reduceMemoryFootprint; 806 } 807 808 protected boolean isReduceMemoryFootprint() { 809 return this.reduceMemoryFootprint; 810 } 811 812 @Override 813 public boolean isDoOptimzeMessageStorage() { 814 return doOptimzeMessageStorage; 815 } 816 817 @Override 818 public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { 819 this.doOptimzeMessageStorage = doOptimzeMessageStorage; 820 } 821 822 public int getOptimizeMessageStoreInFlightLimit() { 823 return optimizeMessageStoreInFlightLimit; 824 } 825 826 public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) { 827 this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; 828 } 829 830 831 @Override 832 public abstract List<Subscription> getConsumers(); 833 834 protected boolean hasRegularConsumers(List<Subscription> consumers) { 835 boolean hasRegularConsumers = false; 836 for (Subscription subscription: consumers) { 837 if (!subscription.getConsumerInfo().isNetworkSubscription()) { 838 hasRegularConsumers = true; 839 break; 840 } 841 } 842 return hasRegularConsumers; 843 } 844 845 public ConnectionContext createConnectionContext() { 846 ConnectionContext answer = new ConnectionContext(); 847 answer.setBroker(this.broker); 848 answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); 849 answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 850 return answer; 851 } 852 853 protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) { 854 // the original ack may be a ranged ack, but we are trying to delete 855 // a specific 856 // message store here so we need to convert to a non ranged ack. 857 if (ack.getMessageCount() > 0) { 858 // Dup the ack 859 MessageAck a = new MessageAck(); 860 ack.copy(a); 861 ack = a; 862 // Convert to non-ranged. 863 ack.setMessageCount(1); 864 } 865 // always use node messageId so we can access entry/data Location 866 ack.setFirstMessageId(node.getMessageId()); 867 ack.setLastMessageId(node.getMessageId()); 868 return ack; 869 } 870 871 protected boolean isDLQ() { 872 return destination.isDLQ(); 873 } 874 875 @Override 876 public void duplicateFromStore(Message message, Subscription subscription) { 877 ConnectionContext connectionContext = createConnectionContext(); 878 getLog().warn("{}{}, redirecting {} for dlq processing", DUPLICATE_FROM_STORE_MSG_PREFIX, destination, message.getMessageId()); 879 Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination); 880 message.setRegionDestination(this); 881 broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause); 882 MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1); 883 messageAck.setPoisonCause(cause); 884 try { 885 acknowledge(connectionContext, subscription, messageAck, message); 886 } catch (IOException e) { 887 getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), destination, messageAck); 888 } 889 } 890 891 public void setPersistJMSRedelivered(boolean persistJMSRedelivered) { 892 this.persistJMSRedelivered = persistJMSRedelivered; 893 } 894 895 public boolean isPersistJMSRedelivered() { 896 return persistJMSRedelivered; 897 } 898 899 public SystemUsage getSystemUsage() { 900 return systemUsage; 901 } 902}