001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.List;
021import java.util.concurrent.atomic.AtomicBoolean;
022
023import javax.jms.ResourceAllocationException;
024
025import org.apache.activemq.advisory.AdvisorySupport;
026import org.apache.activemq.broker.Broker;
027import org.apache.activemq.broker.BrokerService;
028import org.apache.activemq.broker.ConnectionContext;
029import org.apache.activemq.broker.ProducerBrokerExchange;
030import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
031import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.ActiveMQTopic;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.MessageAck;
036import org.apache.activemq.command.MessageDispatchNotification;
037import org.apache.activemq.command.ProducerInfo;
038import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
039import org.apache.activemq.security.SecurityContext;
040import org.apache.activemq.state.ProducerState;
041import org.apache.activemq.store.MessageStore;
042import org.apache.activemq.thread.Scheduler;
043import org.apache.activemq.usage.MemoryUsage;
044import org.apache.activemq.usage.SystemUsage;
045import org.apache.activemq.usage.Usage;
046import org.slf4j.Logger;
047
048/**
049 *
050 */
051public abstract class BaseDestination implements Destination {
052    /**
053     * The maximum number of messages to page in to the destination from
054     * persistent storage
055     */
056    public static final int MAX_PAGE_SIZE = 200;
057    public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
058    public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
059    public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
060    public static final int MAX_PRODUCERS_TO_AUDIT = 64;
061    public static final int MAX_AUDIT_DEPTH = 10000;
062    public static final String DUPLICATE_FROM_STORE_MSG_PREFIX = "duplicate from store for ";
063
064    protected final AtomicBoolean started = new AtomicBoolean();
065    protected final ActiveMQDestination destination;
066    protected final Broker broker;
067    protected final MessageStore store;
068    protected SystemUsage systemUsage;
069    protected MemoryUsage memoryUsage;
070    private boolean producerFlowControl = true;
071    private boolean alwaysRetroactive = false;
072    protected long lastBlockedProducerWarnTime = 0l;
073    protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
074
075    private int maxProducersToAudit = 1024;
076    private int maxAuditDepth = 2048;
077    private boolean enableAudit = true;
078    private int maxPageSize = MAX_PAGE_SIZE;
079    private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
080    private boolean useCache = true;
081    private int minimumMessageSize = 1024;
082    private boolean lazyDispatch = false;
083    private boolean advisoryForSlowConsumers;
084    private boolean advisoryForFastProducers;
085    private boolean advisoryForDiscardingMessages;
086    private boolean advisoryWhenFull;
087    private boolean advisoryForDelivery;
088    private boolean advisoryForConsumed;
089    private boolean sendAdvisoryIfNoConsumers;
090    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
091    protected final BrokerService brokerService;
092    protected final Broker regionBroker;
093    protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
094    protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
095    private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
096    protected int cursorMemoryHighWaterMark = 70;
097    protected int storeUsageHighWaterMark = 100;
098    private SlowConsumerStrategy slowConsumerStrategy;
099    private boolean prioritizedMessages;
100    private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
101    private boolean gcIfInactive;
102    private boolean gcWithNetworkConsumers;
103    private long lastActiveTime=0l;
104    private boolean reduceMemoryFootprint = false;
105    protected final Scheduler scheduler;
106    private boolean disposed = false;
107    private boolean doOptimzeMessageStorage = true;
108    /*
109     * percentage of in-flight messages above which optimize message store is disabled
110     */
111    private int optimizeMessageStoreInFlightLimit = 10;
112    private boolean persistJMSRedelivered;
113
114    /**
115     * @param brokerService
116     * @param store
117     * @param destination
118     * @param parentStats
119     * @throws Exception
120     */
121    public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
122        this.brokerService = brokerService;
123        this.broker = brokerService.getBroker();
124        this.store = store;
125        this.destination = destination;
126        // let's copy the enabled property from the parent DestinationStatistics
127        this.destinationStatistics.setEnabled(parentStats.isEnabled());
128        this.destinationStatistics.setParent(parentStats);
129        this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString());
130        this.memoryUsage = this.systemUsage.getMemoryUsage();
131        this.memoryUsage.setUsagePortion(1.0f);
132        this.regionBroker = brokerService.getRegionBroker();
133        this.scheduler = brokerService.getBroker().getScheduler();
134    }
135
136    /**
137     * initialize the destination
138     *
139     * @throws Exception
140     */
141    public void initialize() throws Exception {
142        // Let the store know what usage manager we are using so that he can
143        // flush messages to disk when usage gets high.
144        if (store != null) {
145            store.setMemoryUsage(this.memoryUsage);
146        }
147    }
148
149    /**
150     * @return the producerFlowControl
151     */
152    @Override
153    public boolean isProducerFlowControl() {
154        return producerFlowControl;
155    }
156
157    /**
158     * @param producerFlowControl the producerFlowControl to set
159     */
160    @Override
161    public void setProducerFlowControl(boolean producerFlowControl) {
162        this.producerFlowControl = producerFlowControl;
163    }
164
165    @Override
166    public boolean isAlwaysRetroactive() {
167        return alwaysRetroactive;
168    }
169
170    @Override
171    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
172        this.alwaysRetroactive = alwaysRetroactive;
173    }
174
175    /**
176     * Set's the interval at which warnings about producers being blocked by
177     * resource usage will be triggered. Values of 0 or less will disable
178     * warnings
179     *
180     * @param blockedProducerWarningInterval the interval at which warning about
181     *            blocked producers will be triggered.
182     */
183    @Override
184    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
185        this.blockedProducerWarningInterval = blockedProducerWarningInterval;
186    }
187
188    /**
189     *
190     * @return the interval at which warning about blocked producers will be
191     *         triggered.
192     */
193    @Override
194    public long getBlockedProducerWarningInterval() {
195        return blockedProducerWarningInterval;
196    }
197
198    /**
199     * @return the maxProducersToAudit
200     */
201    @Override
202    public int getMaxProducersToAudit() {
203        return maxProducersToAudit;
204    }
205
206    /**
207     * @param maxProducersToAudit the maxProducersToAudit to set
208     */
209    @Override
210    public void setMaxProducersToAudit(int maxProducersToAudit) {
211        this.maxProducersToAudit = maxProducersToAudit;
212    }
213
214    /**
215     * @return the maxAuditDepth
216     */
217    @Override
218    public int getMaxAuditDepth() {
219        return maxAuditDepth;
220    }
221
222    /**
223     * @param maxAuditDepth the maxAuditDepth to set
224     */
225    @Override
226    public void setMaxAuditDepth(int maxAuditDepth) {
227        this.maxAuditDepth = maxAuditDepth;
228    }
229
230    /**
231     * @return the enableAudit
232     */
233    @Override
234    public boolean isEnableAudit() {
235        return enableAudit;
236    }
237
238    /**
239     * @param enableAudit the enableAudit to set
240     */
241    @Override
242    public void setEnableAudit(boolean enableAudit) {
243        this.enableAudit = enableAudit;
244    }
245
246    @Override
247    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
248        destinationStatistics.getProducers().increment();
249        this.lastActiveTime=0l;
250    }
251
252    @Override
253    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
254        destinationStatistics.getProducers().decrement();
255    }
256
257    @Override
258    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
259        destinationStatistics.getConsumers().increment();
260        this.lastActiveTime=0l;
261    }
262
263    @Override
264    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
265        destinationStatistics.getConsumers().decrement();
266        this.lastActiveTime=0l;
267    }
268
269
270    @Override
271    public final MemoryUsage getMemoryUsage() {
272        return memoryUsage;
273    }
274
275    @Override
276    public void setMemoryUsage(MemoryUsage memoryUsage) {
277        this.memoryUsage = memoryUsage;
278    }
279
280    @Override
281    public DestinationStatistics getDestinationStatistics() {
282        return destinationStatistics;
283    }
284
285    @Override
286    public ActiveMQDestination getActiveMQDestination() {
287        return destination;
288    }
289
290    @Override
291    public final String getName() {
292        return getActiveMQDestination().getPhysicalName();
293    }
294
295    @Override
296    public final MessageStore getMessageStore() {
297        return store;
298    }
299
300    @Override
301    public boolean isActive() {
302        boolean isActive = destinationStatistics.getConsumers().getCount() > 0 ||
303                           destinationStatistics.getProducers().getCount() > 0;
304        if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() > 0) {
305            isActive = hasRegularConsumers(getConsumers());
306        }
307        return isActive;
308    }
309
310    @Override
311    public int getMaxPageSize() {
312        return maxPageSize;
313    }
314
315    @Override
316    public void setMaxPageSize(int maxPageSize) {
317        this.maxPageSize = maxPageSize;
318    }
319
320    @Override
321    public int getMaxBrowsePageSize() {
322        return this.maxBrowsePageSize;
323    }
324
325    @Override
326    public void setMaxBrowsePageSize(int maxPageSize) {
327        this.maxBrowsePageSize = maxPageSize;
328    }
329
330    public int getMaxExpirePageSize() {
331        return this.maxExpirePageSize;
332    }
333
334    public void setMaxExpirePageSize(int maxPageSize) {
335        this.maxExpirePageSize = maxPageSize;
336    }
337
338    public void setExpireMessagesPeriod(long expireMessagesPeriod) {
339        this.expireMessagesPeriod = expireMessagesPeriod;
340    }
341
342    public long getExpireMessagesPeriod() {
343        return expireMessagesPeriod;
344    }
345
346    @Override
347    public boolean isUseCache() {
348        return useCache;
349    }
350
351    @Override
352    public void setUseCache(boolean useCache) {
353        this.useCache = useCache;
354    }
355
356    @Override
357    public int getMinimumMessageSize() {
358        return minimumMessageSize;
359    }
360
361    @Override
362    public void setMinimumMessageSize(int minimumMessageSize) {
363        this.minimumMessageSize = minimumMessageSize;
364    }
365
366    @Override
367    public boolean isLazyDispatch() {
368        return lazyDispatch;
369    }
370
371    @Override
372    public void setLazyDispatch(boolean lazyDispatch) {
373        this.lazyDispatch = lazyDispatch;
374    }
375
376    protected long getDestinationSequenceId() {
377        return regionBroker.getBrokerSequenceId();
378    }
379
380    /**
381     * @return the advisoryForSlowConsumers
382     */
383    public boolean isAdvisoryForSlowConsumers() {
384        return advisoryForSlowConsumers;
385    }
386
387    /**
388     * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
389     */
390    public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
391        this.advisoryForSlowConsumers = advisoryForSlowConsumers;
392    }
393
394    /**
395     * @return the advisoryForDiscardingMessages
396     */
397    public boolean isAdvisoryForDiscardingMessages() {
398        return advisoryForDiscardingMessages;
399    }
400
401    /**
402     * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
403     *            set
404     */
405    public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
406        this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
407    }
408
409    /**
410     * @return the advisoryWhenFull
411     */
412    public boolean isAdvisoryWhenFull() {
413        return advisoryWhenFull;
414    }
415
416    /**
417     * @param advisoryWhenFull the advisoryWhenFull to set
418     */
419    public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
420        this.advisoryWhenFull = advisoryWhenFull;
421    }
422
423    /**
424     * @return the advisoryForDelivery
425     */
426    public boolean isAdvisoryForDelivery() {
427        return advisoryForDelivery;
428    }
429
430    /**
431     * @param advisoryForDelivery the advisoryForDelivery to set
432     */
433    public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
434        this.advisoryForDelivery = advisoryForDelivery;
435    }
436
437    /**
438     * @return the advisoryForConsumed
439     */
440    public boolean isAdvisoryForConsumed() {
441        return advisoryForConsumed;
442    }
443
444    /**
445     * @param advisoryForConsumed the advisoryForConsumed to set
446     */
447    public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
448        this.advisoryForConsumed = advisoryForConsumed;
449    }
450
451    /**
452     * @return the advisdoryForFastProducers
453     */
454    public boolean isAdvisoryForFastProducers() {
455        return advisoryForFastProducers;
456    }
457
458    /**
459     * @param advisoryForFastProducers the advisdoryForFastProducers to set
460     */
461    public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
462        this.advisoryForFastProducers = advisoryForFastProducers;
463    }
464
465    public boolean isSendAdvisoryIfNoConsumers() {
466        return sendAdvisoryIfNoConsumers;
467    }
468
469    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
470        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
471    }
472
473    /**
474     * @return the dead letter strategy
475     */
476    @Override
477    public DeadLetterStrategy getDeadLetterStrategy() {
478        return deadLetterStrategy;
479    }
480
481    /**
482     * set the dead letter strategy
483     *
484     * @param deadLetterStrategy
485     */
486    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
487        this.deadLetterStrategy = deadLetterStrategy;
488    }
489
490    @Override
491    public int getCursorMemoryHighWaterMark() {
492        return this.cursorMemoryHighWaterMark;
493    }
494
495    @Override
496    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
497        this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
498    }
499
500    /**
501     * called when message is consumed
502     *
503     * @param context
504     * @param messageReference
505     */
506    @Override
507    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
508        if (advisoryForConsumed) {
509            broker.messageConsumed(context, messageReference);
510        }
511    }
512
513    /**
514     * Called when message is delivered to the broker
515     *
516     * @param context
517     * @param messageReference
518     */
519    @Override
520    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
521        this.lastActiveTime = 0L;
522        if (advisoryForDelivery) {
523            broker.messageDelivered(context, messageReference);
524        }
525    }
526
527    /**
528     * Called when a message is discarded - e.g. running low on memory This will
529     * happen only if the policy is enabled - e.g. non durable topics
530     *
531     * @param context
532     * @param messageReference
533     */
534    @Override
535    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
536        if (advisoryForDiscardingMessages) {
537            broker.messageDiscarded(context, sub, messageReference);
538        }
539    }
540
541    /**
542     * Called when there is a slow consumer
543     *
544     * @param context
545     * @param subs
546     */
547    @Override
548    public void slowConsumer(ConnectionContext context, Subscription subs) {
549        if (advisoryForSlowConsumers) {
550            broker.slowConsumer(context, this, subs);
551        }
552        if (slowConsumerStrategy != null) {
553            slowConsumerStrategy.slowConsumer(context, subs);
554        }
555    }
556
557    /**
558     * Called to notify a producer is too fast
559     *
560     * @param context
561     * @param producerInfo
562     */
563    @Override
564    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
565        if (advisoryForFastProducers) {
566            broker.fastProducer(context, producerInfo, getActiveMQDestination());
567        }
568    }
569
570    /**
571     * Called when a Usage reaches a limit
572     *
573     * @param context
574     * @param usage
575     */
576    @Override
577    public void isFull(ConnectionContext context, Usage<?> usage) {
578        if (advisoryWhenFull) {
579            broker.isFull(context, this, usage);
580        }
581    }
582
583    @Override
584    public void dispose(ConnectionContext context) throws IOException {
585        if (this.store != null) {
586            this.store.removeAllMessages(context);
587            this.store.dispose(context);
588        }
589        this.destinationStatistics.setParent(null);
590        this.memoryUsage.stop();
591        this.disposed = true;
592    }
593
594    @Override
595    public boolean isDisposed() {
596        return this.disposed;
597    }
598
599    /**
600     * Provides a hook to allow messages with no consumer to be processed in
601     * some way - such as to send to a dead letter queue or something..
602     */
603    protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
604        if (!msg.isPersistent()) {
605            if (isSendAdvisoryIfNoConsumers()) {
606                // allow messages with no consumers to be dispatched to a dead
607                // letter queue
608                if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
609
610                    Message message = msg.copy();
611                    // The original destination and transaction id do not get
612                    // filled when the message is first sent,
613                    // it is only populated if the message is routed to another
614                    // destination like the DLQ
615                    if (message.getOriginalDestination() != null) {
616                        message.setOriginalDestination(message.getDestination());
617                    }
618                    if (message.getOriginalTransactionId() != null) {
619                        message.setOriginalTransactionId(message.getTransactionId());
620                    }
621
622                    ActiveMQTopic advisoryTopic;
623                    if (destination.isQueue()) {
624                        advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
625                    } else {
626                        advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
627                    }
628                    message.setDestination(advisoryTopic);
629                    message.setTransactionId(null);
630
631                    // Disable flow control for this since since we don't want
632                    // to block.
633                    boolean originalFlowControl = context.isProducerFlowControl();
634                    try {
635                        context.setProducerFlowControl(false);
636                        ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
637                        producerExchange.setMutable(false);
638                        producerExchange.setConnectionContext(context);
639                        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
640                        context.getBroker().send(producerExchange, message);
641                    } finally {
642                        context.setProducerFlowControl(originalFlowControl);
643                    }
644
645                }
646            }
647        }
648    }
649
650    @Override
651    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
652    }
653
654    public final int getStoreUsageHighWaterMark() {
655        return this.storeUsageHighWaterMark;
656    }
657
658    public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
659        this.storeUsageHighWaterMark = storeUsageHighWaterMark;
660    }
661
662    protected final void waitForSpace(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
663        waitForSpace(context, producerBrokerExchange, usage, 100, warning);
664    }
665
666    protected final void waitForSpace(ConnectionContext context, ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
667        if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
668            if (isFlowControlLogRequired()) {
669                getLog().info("sendFailIfNoSpace, forcing exception on send, usage: {}: {}", usage, warning);
670            } else {
671                getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: {}: {}", usage, warning);
672            }
673            throw new ResourceAllocationException(warning);
674        }
675        if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
676            if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
677                if (isFlowControlLogRequired()) {
678                    getLog().info("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: {}: {}", usage, warning);
679                } else {
680                    getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: {}: {}", usage, warning);
681                }
682                throw new ResourceAllocationException(warning);
683            }
684        } else {
685            long start = System.currentTimeMillis();
686            producerBrokerExchange.blockingOnFlowControl(true);
687            destinationStatistics.getBlockedSends().increment();
688            while (!usage.waitForSpace(1000, highWaterMark)) {
689                if (context.getStopping().get()) {
690                    throw new IOException("Connection closed, send aborted.");
691                }
692
693                if (isFlowControlLogRequired()) {
694                    getLog().warn("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))});
695                } else {
696                    getLog().debug("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))});
697                }
698            }
699            long finish = System.currentTimeMillis();
700            long totalTimeBlocked = finish - start;
701            destinationStatistics.getBlockedTime().addTime(totalTimeBlocked);
702            producerBrokerExchange.incrementTimeBlocked(this,totalTimeBlocked);
703            producerBrokerExchange.blockingOnFlowControl(false);
704        }
705    }
706
707    protected boolean isFlowControlLogRequired() {
708        boolean answer = false;
709        if (blockedProducerWarningInterval > 0) {
710            long now = System.currentTimeMillis();
711            if (lastBlockedProducerWarnTime + blockedProducerWarningInterval <= now) {
712                lastBlockedProducerWarnTime = now;
713                answer = true;
714            }
715        }
716        return answer;
717    }
718
719    protected abstract Logger getLog();
720
721    public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
722        this.slowConsumerStrategy = slowConsumerStrategy;
723    }
724
725    @Override
726    public SlowConsumerStrategy getSlowConsumerStrategy() {
727        return this.slowConsumerStrategy;
728    }
729
730
731    @Override
732    public boolean isPrioritizedMessages() {
733        return this.prioritizedMessages;
734    }
735
736    public void setPrioritizedMessages(boolean prioritizedMessages) {
737        this.prioritizedMessages = prioritizedMessages;
738        if (store != null) {
739            store.setPrioritizedMessages(prioritizedMessages);
740        }
741    }
742
743    /**
744     * @return the inactiveTimeoutBeforeGC
745     */
746    @Override
747    public long getInactiveTimeoutBeforeGC() {
748        return this.inactiveTimeoutBeforeGC;
749    }
750
751    /**
752     * @param inactiveTimeoutBeforeGC the inactiveTimeoutBeforeGC to set
753     */
754    public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) {
755        this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC;
756    }
757
758    /**
759     * @return the gcIfInactive
760     */
761    public boolean isGcIfInactive() {
762        return this.gcIfInactive;
763    }
764
765    /**
766     * @param gcIfInactive the gcIfInactive to set
767     */
768    public void setGcIfInactive(boolean gcIfInactive) {
769        this.gcIfInactive = gcIfInactive;
770    }
771
772    /**
773     * Indicate if it is ok to gc destinations that have only network consumers
774     * @param gcWithNetworkConsumers
775     */
776    public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
777        this.gcWithNetworkConsumers = gcWithNetworkConsumers;
778    }
779
780    public boolean isGcWithNetworkConsumers() {
781        return gcWithNetworkConsumers;
782    }
783
784    @Override
785    public void markForGC(long timeStamp) {
786        if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
787                && destinationStatistics.messages.getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
788            this.lastActiveTime = timeStamp;
789        }
790    }
791
792    @Override
793    public boolean canGC() {
794        boolean result = false;
795        final long currentLastActiveTime = this.lastActiveTime;
796        if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.messages.getCount() == 0L ) {
797            if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) {
798                result = true;
799            }
800        }
801        return result;
802    }
803
804    public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
805        this.reduceMemoryFootprint = reduceMemoryFootprint;
806    }
807
808    protected boolean isReduceMemoryFootprint() {
809        return this.reduceMemoryFootprint;
810    }
811
812    @Override
813    public boolean isDoOptimzeMessageStorage() {
814        return doOptimzeMessageStorage;
815    }
816
817    @Override
818    public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
819        this.doOptimzeMessageStorage = doOptimzeMessageStorage;
820    }
821
822    public int getOptimizeMessageStoreInFlightLimit() {
823        return optimizeMessageStoreInFlightLimit;
824    }
825
826    public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
827        this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
828    }
829
830
831    @Override
832    public abstract List<Subscription> getConsumers();
833
834    protected boolean hasRegularConsumers(List<Subscription> consumers) {
835        boolean hasRegularConsumers = false;
836        for (Subscription subscription: consumers) {
837            if (!subscription.getConsumerInfo().isNetworkSubscription()) {
838                hasRegularConsumers = true;
839                break;
840            }
841        }
842        return hasRegularConsumers;
843    }
844
845    public ConnectionContext createConnectionContext() {
846        ConnectionContext answer = new ConnectionContext();
847        answer.setBroker(this.broker);
848        answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
849        answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
850        return answer;
851    }
852
853    protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) {
854        // the original ack may be a ranged ack, but we are trying to delete
855        // a specific
856        // message store here so we need to convert to a non ranged ack.
857        if (ack.getMessageCount() > 0) {
858            // Dup the ack
859            MessageAck a = new MessageAck();
860            ack.copy(a);
861            ack = a;
862            // Convert to non-ranged.
863            ack.setMessageCount(1);
864        }
865        // always use node messageId so we can access entry/data Location
866        ack.setFirstMessageId(node.getMessageId());
867        ack.setLastMessageId(node.getMessageId());
868        return ack;
869    }
870
871    protected boolean isDLQ() {
872        return destination.isDLQ();
873    }
874
875    @Override
876    public void duplicateFromStore(Message message, Subscription subscription) {
877        ConnectionContext connectionContext = createConnectionContext();
878        getLog().warn("{}{}, redirecting {} for dlq processing", DUPLICATE_FROM_STORE_MSG_PREFIX, destination, message.getMessageId());
879        Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination);
880        message.setRegionDestination(this);
881        broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
882        MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1);
883        messageAck.setPoisonCause(cause);
884        try {
885            acknowledge(connectionContext, subscription, messageAck, message);
886        } catch (IOException e) {
887            getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), destination, messageAck);
888        }
889    }
890
891    public void setPersistJMSRedelivered(boolean persistJMSRedelivered) {
892        this.persistJMSRedelivered = persistJMSRedelivered;
893    }
894
895    public boolean isPersistJMSRedelivered() {
896        return persistJMSRedelivered;
897    }
898
899    public SystemUsage getSystemUsage() {
900        return systemUsage;
901    }
902}