001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.List;
021import java.util.concurrent.atomic.AtomicBoolean;
022
023import javax.jms.ResourceAllocationException;
024
025import org.apache.activemq.advisory.AdvisorySupport;
026import org.apache.activemq.broker.Broker;
027import org.apache.activemq.broker.BrokerService;
028import org.apache.activemq.broker.ConnectionContext;
029import org.apache.activemq.broker.ProducerBrokerExchange;
030import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
031import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.ActiveMQTopic;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.MessageAck;
036import org.apache.activemq.command.MessageDispatchNotification;
037import org.apache.activemq.command.ProducerInfo;
038import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
039import org.apache.activemq.security.SecurityContext;
040import org.apache.activemq.state.ProducerState;
041import org.apache.activemq.store.MessageStore;
042import org.apache.activemq.thread.Scheduler;
043import org.apache.activemq.usage.MemoryUsage;
044import org.apache.activemq.usage.SystemUsage;
045import org.apache.activemq.usage.Usage;
046import org.slf4j.Logger;
047
048/**
049 *
050 */
051public abstract class BaseDestination implements Destination {
052    /**
053     * The maximum number of messages to page in to the destination from
054     * persistent storage
055     */
056    public static final int MAX_PAGE_SIZE = 200;
057    public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
058    public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
059    public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
060    public static final int MAX_PRODUCERS_TO_AUDIT = 64;
061    public static final int MAX_AUDIT_DEPTH = 10000;
062
063    protected final AtomicBoolean started = new AtomicBoolean();
064    protected final ActiveMQDestination destination;
065    protected final Broker broker;
066    protected final MessageStore store;
067    protected SystemUsage systemUsage;
068    protected MemoryUsage memoryUsage;
069    private boolean producerFlowControl = true;
070    private boolean alwaysRetroactive = false;
071    protected long lastBlockedProducerWarnTime = 0l;
072    protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
073
074    private int maxProducersToAudit = 1024;
075    private int maxAuditDepth = 2048;
076    private boolean enableAudit = true;
077    private int maxPageSize = MAX_PAGE_SIZE;
078    private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
079    private boolean useCache = true;
080    private int minimumMessageSize = 1024;
081    private boolean lazyDispatch = false;
082    private boolean advisoryForSlowConsumers;
083    private boolean advisoryForFastProducers;
084    private boolean advisoryForDiscardingMessages;
085    private boolean advisoryWhenFull;
086    private boolean advisoryForDelivery;
087    private boolean advisoryForConsumed;
088    private boolean sendAdvisoryIfNoConsumers;
089    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
090    protected final BrokerService brokerService;
091    protected final Broker regionBroker;
092    protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
093    protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
094    private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
095    protected int cursorMemoryHighWaterMark = 70;
096    protected int storeUsageHighWaterMark = 100;
097    private SlowConsumerStrategy slowConsumerStrategy;
098    private boolean prioritizedMessages;
099    private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
100    private boolean gcIfInactive;
101    private boolean gcWithNetworkConsumers;
102    private long lastActiveTime=0l;
103    private boolean reduceMemoryFootprint = false;
104    protected final Scheduler scheduler;
105    private boolean disposed = false;
106    private boolean doOptimzeMessageStorage = true;
107    /*
108     * percentage of in-flight messages above which optimize message store is disabled
109     */
110    private int optimizeMessageStoreInFlightLimit = 10;
111    private boolean persistJMSRedelivered;
112
113    /**
114     * @param brokerService
115     * @param store
116     * @param destination
117     * @param parentStats
118     * @throws Exception
119     */
120    public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
121        this.brokerService = brokerService;
122        this.broker = brokerService.getBroker();
123        this.store = store;
124        this.destination = destination;
125        // let's copy the enabled property from the parent DestinationStatistics
126        this.destinationStatistics.setEnabled(parentStats.isEnabled());
127        this.destinationStatistics.setParent(parentStats);
128        this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString());
129        this.memoryUsage = this.systemUsage.getMemoryUsage();
130        this.memoryUsage.setUsagePortion(1.0f);
131        this.regionBroker = brokerService.getRegionBroker();
132        this.scheduler = brokerService.getBroker().getScheduler();
133    }
134
135    /**
136     * initialize the destination
137     *
138     * @throws Exception
139     */
140    public void initialize() throws Exception {
141        // Let the store know what usage manager we are using so that he can
142        // flush messages to disk when usage gets high.
143        if (store != null) {
144            store.setMemoryUsage(this.memoryUsage);
145        }
146    }
147
148    /**
149     * @return the producerFlowControl
150     */
151    @Override
152    public boolean isProducerFlowControl() {
153        return producerFlowControl;
154    }
155
156    /**
157     * @param producerFlowControl the producerFlowControl to set
158     */
159    @Override
160    public void setProducerFlowControl(boolean producerFlowControl) {
161        this.producerFlowControl = producerFlowControl;
162    }
163
164    @Override
165    public boolean isAlwaysRetroactive() {
166        return alwaysRetroactive;
167    }
168
169    @Override
170    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
171        this.alwaysRetroactive = alwaysRetroactive;
172    }
173
174    /**
175     * Set's the interval at which warnings about producers being blocked by
176     * resource usage will be triggered. Values of 0 or less will disable
177     * warnings
178     *
179     * @param blockedProducerWarningInterval the interval at which warning about
180     *            blocked producers will be triggered.
181     */
182    @Override
183    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
184        this.blockedProducerWarningInterval = blockedProducerWarningInterval;
185    }
186
187    /**
188     *
189     * @return the interval at which warning about blocked producers will be
190     *         triggered.
191     */
192    @Override
193    public long getBlockedProducerWarningInterval() {
194        return blockedProducerWarningInterval;
195    }
196
197    /**
198     * @return the maxProducersToAudit
199     */
200    @Override
201    public int getMaxProducersToAudit() {
202        return maxProducersToAudit;
203    }
204
205    /**
206     * @param maxProducersToAudit the maxProducersToAudit to set
207     */
208    @Override
209    public void setMaxProducersToAudit(int maxProducersToAudit) {
210        this.maxProducersToAudit = maxProducersToAudit;
211    }
212
213    /**
214     * @return the maxAuditDepth
215     */
216    @Override
217    public int getMaxAuditDepth() {
218        return maxAuditDepth;
219    }
220
221    /**
222     * @param maxAuditDepth the maxAuditDepth to set
223     */
224    @Override
225    public void setMaxAuditDepth(int maxAuditDepth) {
226        this.maxAuditDepth = maxAuditDepth;
227    }
228
229    /**
230     * @return the enableAudit
231     */
232    @Override
233    public boolean isEnableAudit() {
234        return enableAudit;
235    }
236
237    /**
238     * @param enableAudit the enableAudit to set
239     */
240    @Override
241    public void setEnableAudit(boolean enableAudit) {
242        this.enableAudit = enableAudit;
243    }
244
245    @Override
246    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
247        destinationStatistics.getProducers().increment();
248        this.lastActiveTime=0l;
249    }
250
251    @Override
252    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
253        destinationStatistics.getProducers().decrement();
254    }
255
256    @Override
257    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
258        destinationStatistics.getConsumers().increment();
259        this.lastActiveTime=0l;
260    }
261
262    @Override
263    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
264        destinationStatistics.getConsumers().decrement();
265        this.lastActiveTime=0l;
266    }
267
268
269    @Override
270    public final MemoryUsage getMemoryUsage() {
271        return memoryUsage;
272    }
273
274    @Override
275    public void setMemoryUsage(MemoryUsage memoryUsage) {
276        this.memoryUsage = memoryUsage;
277    }
278
279    @Override
280    public DestinationStatistics getDestinationStatistics() {
281        return destinationStatistics;
282    }
283
284    @Override
285    public ActiveMQDestination getActiveMQDestination() {
286        return destination;
287    }
288
289    @Override
290    public final String getName() {
291        return getActiveMQDestination().getPhysicalName();
292    }
293
294    @Override
295    public final MessageStore getMessageStore() {
296        return store;
297    }
298
299    @Override
300    public boolean isActive() {
301        boolean isActive = destinationStatistics.getConsumers().getCount() > 0 ||
302                           destinationStatistics.getProducers().getCount() > 0;
303        if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() > 0) {
304            isActive = hasRegularConsumers(getConsumers());
305        }
306        return isActive;
307    }
308
309    @Override
310    public int getMaxPageSize() {
311        return maxPageSize;
312    }
313
314    @Override
315    public void setMaxPageSize(int maxPageSize) {
316        this.maxPageSize = maxPageSize;
317    }
318
319    @Override
320    public int getMaxBrowsePageSize() {
321        return this.maxBrowsePageSize;
322    }
323
324    @Override
325    public void setMaxBrowsePageSize(int maxPageSize) {
326        this.maxBrowsePageSize = maxPageSize;
327    }
328
329    public int getMaxExpirePageSize() {
330        return this.maxExpirePageSize;
331    }
332
333    public void setMaxExpirePageSize(int maxPageSize) {
334        this.maxExpirePageSize = maxPageSize;
335    }
336
337    public void setExpireMessagesPeriod(long expireMessagesPeriod) {
338        this.expireMessagesPeriod = expireMessagesPeriod;
339    }
340
341    public long getExpireMessagesPeriod() {
342        return expireMessagesPeriod;
343    }
344
345    @Override
346    public boolean isUseCache() {
347        return useCache;
348    }
349
350    @Override
351    public void setUseCache(boolean useCache) {
352        this.useCache = useCache;
353    }
354
355    @Override
356    public int getMinimumMessageSize() {
357        return minimumMessageSize;
358    }
359
360    @Override
361    public void setMinimumMessageSize(int minimumMessageSize) {
362        this.minimumMessageSize = minimumMessageSize;
363    }
364
365    @Override
366    public boolean isLazyDispatch() {
367        return lazyDispatch;
368    }
369
370    @Override
371    public void setLazyDispatch(boolean lazyDispatch) {
372        this.lazyDispatch = lazyDispatch;
373    }
374
375    protected long getDestinationSequenceId() {
376        return regionBroker.getBrokerSequenceId();
377    }
378
379    /**
380     * @return the advisoryForSlowConsumers
381     */
382    public boolean isAdvisoryForSlowConsumers() {
383        return advisoryForSlowConsumers;
384    }
385
386    /**
387     * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
388     */
389    public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
390        this.advisoryForSlowConsumers = advisoryForSlowConsumers;
391    }
392
393    /**
394     * @return the advisoryForDiscardingMessages
395     */
396    public boolean isAdvisoryForDiscardingMessages() {
397        return advisoryForDiscardingMessages;
398    }
399
400    /**
401     * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
402     *            set
403     */
404    public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
405        this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
406    }
407
408    /**
409     * @return the advisoryWhenFull
410     */
411    public boolean isAdvisoryWhenFull() {
412        return advisoryWhenFull;
413    }
414
415    /**
416     * @param advisoryWhenFull the advisoryWhenFull to set
417     */
418    public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
419        this.advisoryWhenFull = advisoryWhenFull;
420    }
421
422    /**
423     * @return the advisoryForDelivery
424     */
425    public boolean isAdvisoryForDelivery() {
426        return advisoryForDelivery;
427    }
428
429    /**
430     * @param advisoryForDelivery the advisoryForDelivery to set
431     */
432    public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
433        this.advisoryForDelivery = advisoryForDelivery;
434    }
435
436    /**
437     * @return the advisoryForConsumed
438     */
439    public boolean isAdvisoryForConsumed() {
440        return advisoryForConsumed;
441    }
442
443    /**
444     * @param advisoryForConsumed the advisoryForConsumed to set
445     */
446    public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
447        this.advisoryForConsumed = advisoryForConsumed;
448    }
449
450    /**
451     * @return the advisdoryForFastProducers
452     */
453    public boolean isAdvisoryForFastProducers() {
454        return advisoryForFastProducers;
455    }
456
457    /**
458     * @param advisoryForFastProducers the advisdoryForFastProducers to set
459     */
460    public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
461        this.advisoryForFastProducers = advisoryForFastProducers;
462    }
463
464    public boolean isSendAdvisoryIfNoConsumers() {
465        return sendAdvisoryIfNoConsumers;
466    }
467
468    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
469        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
470    }
471
472    /**
473     * @return the dead letter strategy
474     */
475    @Override
476    public DeadLetterStrategy getDeadLetterStrategy() {
477        return deadLetterStrategy;
478    }
479
480    /**
481     * set the dead letter strategy
482     *
483     * @param deadLetterStrategy
484     */
485    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
486        this.deadLetterStrategy = deadLetterStrategy;
487    }
488
489    @Override
490    public int getCursorMemoryHighWaterMark() {
491        return this.cursorMemoryHighWaterMark;
492    }
493
494    @Override
495    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
496        this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
497    }
498
499    /**
500     * called when message is consumed
501     *
502     * @param context
503     * @param messageReference
504     */
505    @Override
506    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
507        if (advisoryForConsumed) {
508            broker.messageConsumed(context, messageReference);
509        }
510    }
511
512    /**
513     * Called when message is delivered to the broker
514     *
515     * @param context
516     * @param messageReference
517     */
518    @Override
519    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
520        this.lastActiveTime = 0L;
521        if (advisoryForDelivery) {
522            broker.messageDelivered(context, messageReference);
523        }
524    }
525
526    /**
527     * Called when a message is discarded - e.g. running low on memory This will
528     * happen only if the policy is enabled - e.g. non durable topics
529     *
530     * @param context
531     * @param messageReference
532     */
533    @Override
534    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
535        if (advisoryForDiscardingMessages) {
536            broker.messageDiscarded(context, sub, messageReference);
537        }
538    }
539
540    /**
541     * Called when there is a slow consumer
542     *
543     * @param context
544     * @param subs
545     */
546    @Override
547    public void slowConsumer(ConnectionContext context, Subscription subs) {
548        if (advisoryForSlowConsumers) {
549            broker.slowConsumer(context, this, subs);
550        }
551        if (slowConsumerStrategy != null) {
552            slowConsumerStrategy.slowConsumer(context, subs);
553        }
554    }
555
556    /**
557     * Called to notify a producer is too fast
558     *
559     * @param context
560     * @param producerInfo
561     */
562    @Override
563    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
564        if (advisoryForFastProducers) {
565            broker.fastProducer(context, producerInfo, getActiveMQDestination());
566        }
567    }
568
569    /**
570     * Called when a Usage reaches a limit
571     *
572     * @param context
573     * @param usage
574     */
575    @Override
576    public void isFull(ConnectionContext context, Usage<?> usage) {
577        if (advisoryWhenFull) {
578            broker.isFull(context, this, usage);
579        }
580    }
581
582    @Override
583    public void dispose(ConnectionContext context) throws IOException {
584        if (this.store != null) {
585            this.store.removeAllMessages(context);
586            this.store.dispose(context);
587        }
588        this.destinationStatistics.setParent(null);
589        this.memoryUsage.stop();
590        this.disposed = true;
591    }
592
593    @Override
594    public boolean isDisposed() {
595        return this.disposed;
596    }
597
598    /**
599     * Provides a hook to allow messages with no consumer to be processed in
600     * some way - such as to send to a dead letter queue or something..
601     */
602    protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
603        if (!msg.isPersistent()) {
604            if (isSendAdvisoryIfNoConsumers()) {
605                // allow messages with no consumers to be dispatched to a dead
606                // letter queue
607                if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
608
609                    Message message = msg.copy();
610                    // The original destination and transaction id do not get
611                    // filled when the message is first sent,
612                    // it is only populated if the message is routed to another
613                    // destination like the DLQ
614                    if (message.getOriginalDestination() != null) {
615                        message.setOriginalDestination(message.getDestination());
616                    }
617                    if (message.getOriginalTransactionId() != null) {
618                        message.setOriginalTransactionId(message.getTransactionId());
619                    }
620
621                    ActiveMQTopic advisoryTopic;
622                    if (destination.isQueue()) {
623                        advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
624                    } else {
625                        advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
626                    }
627                    message.setDestination(advisoryTopic);
628                    message.setTransactionId(null);
629
630                    // Disable flow control for this since since we don't want
631                    // to block.
632                    boolean originalFlowControl = context.isProducerFlowControl();
633                    try {
634                        context.setProducerFlowControl(false);
635                        ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
636                        producerExchange.setMutable(false);
637                        producerExchange.setConnectionContext(context);
638                        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
639                        context.getBroker().send(producerExchange, message);
640                    } finally {
641                        context.setProducerFlowControl(originalFlowControl);
642                    }
643
644                }
645            }
646        }
647    }
648
649    @Override
650    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
651    }
652
653    public final int getStoreUsageHighWaterMark() {
654        return this.storeUsageHighWaterMark;
655    }
656
657    public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
658        this.storeUsageHighWaterMark = storeUsageHighWaterMark;
659    }
660
661    protected final void waitForSpace(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
662        waitForSpace(context, producerBrokerExchange, usage, 100, warning);
663    }
664
665    protected final void waitForSpace(ConnectionContext context, ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
666        if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
667            if (isFlowControlLogRequired()) {
668                getLog().info("sendFailIfNoSpace, forcing exception on send, usage: {}: {}", usage, warning);
669            } else {
670                getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: {}: {}", usage, warning);
671            }
672            throw new ResourceAllocationException(warning);
673        }
674        if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
675            if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
676                if (isFlowControlLogRequired()) {
677                    getLog().info("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: {}: {}", usage, warning);
678                } else {
679                    getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: {}: {}", usage, warning);
680                }
681                throw new ResourceAllocationException(warning);
682            }
683        } else {
684            long start = System.currentTimeMillis();
685            producerBrokerExchange.blockingOnFlowControl(true);
686            destinationStatistics.getBlockedSends().increment();
687            while (!usage.waitForSpace(1000, highWaterMark)) {
688                if (context.getStopping().get()) {
689                    throw new IOException("Connection closed, send aborted.");
690                }
691
692                if (isFlowControlLogRequired()) {
693                    getLog().warn("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))});
694                } else {
695                    getLog().debug("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((System.currentTimeMillis() - start) / 1000))});
696                }
697            }
698            long finish = System.currentTimeMillis();
699            long totalTimeBlocked = finish - start;
700            destinationStatistics.getBlockedTime().addTime(totalTimeBlocked);
701            producerBrokerExchange.incrementTimeBlocked(this,totalTimeBlocked);
702            producerBrokerExchange.blockingOnFlowControl(false);
703        }
704    }
705
706    protected boolean isFlowControlLogRequired() {
707        boolean answer = false;
708        if (blockedProducerWarningInterval > 0) {
709            long now = System.currentTimeMillis();
710            if (lastBlockedProducerWarnTime + blockedProducerWarningInterval <= now) {
711                lastBlockedProducerWarnTime = now;
712                answer = true;
713            }
714        }
715        return answer;
716    }
717
718    protected abstract Logger getLog();
719
720    public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
721        this.slowConsumerStrategy = slowConsumerStrategy;
722    }
723
724    @Override
725    public SlowConsumerStrategy getSlowConsumerStrategy() {
726        return this.slowConsumerStrategy;
727    }
728
729
730    @Override
731    public boolean isPrioritizedMessages() {
732        return this.prioritizedMessages;
733    }
734
735    public void setPrioritizedMessages(boolean prioritizedMessages) {
736        this.prioritizedMessages = prioritizedMessages;
737        if (store != null) {
738            store.setPrioritizedMessages(prioritizedMessages);
739        }
740    }
741
742    /**
743     * @return the inactiveTimeoutBeforeGC
744     */
745    @Override
746    public long getInactiveTimeoutBeforeGC() {
747        return this.inactiveTimeoutBeforeGC;
748    }
749
750    /**
751     * @param inactiveTimeoutBeforeGC the inactiveTimeoutBeforeGC to set
752     */
753    public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) {
754        this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC;
755    }
756
757    /**
758     * @return the gcIfInactive
759     */
760    public boolean isGcIfInactive() {
761        return this.gcIfInactive;
762    }
763
764    /**
765     * @param gcIfInactive the gcIfInactive to set
766     */
767    public void setGcIfInactive(boolean gcIfInactive) {
768        this.gcIfInactive = gcIfInactive;
769    }
770
771    /**
772     * Indicate if it is ok to gc destinations that have only network consumers
773     * @param gcWithNetworkConsumers
774     */
775    public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
776        this.gcWithNetworkConsumers = gcWithNetworkConsumers;
777    }
778
779    public boolean isGcWithNetworkConsumers() {
780        return gcWithNetworkConsumers;
781    }
782
783    @Override
784    public void markForGC(long timeStamp) {
785        if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
786                && destinationStatistics.messages.getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
787            this.lastActiveTime = timeStamp;
788        }
789    }
790
791    @Override
792    public boolean canGC() {
793        boolean result = false;
794        final long currentLastActiveTime = this.lastActiveTime;
795        if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.messages.getCount() == 0L ) {
796            if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) {
797                result = true;
798            }
799        }
800        return result;
801    }
802
803    public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
804        this.reduceMemoryFootprint = reduceMemoryFootprint;
805    }
806
807    protected boolean isReduceMemoryFootprint() {
808        return this.reduceMemoryFootprint;
809    }
810
811    @Override
812    public boolean isDoOptimzeMessageStorage() {
813        return doOptimzeMessageStorage;
814    }
815
816    @Override
817    public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
818        this.doOptimzeMessageStorage = doOptimzeMessageStorage;
819    }
820
821    public int getOptimizeMessageStoreInFlightLimit() {
822        return optimizeMessageStoreInFlightLimit;
823    }
824
825    public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
826        this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
827    }
828
829
830    @Override
831    public abstract List<Subscription> getConsumers();
832
833    protected boolean hasRegularConsumers(List<Subscription> consumers) {
834        boolean hasRegularConsumers = false;
835        for (Subscription subscription: consumers) {
836            if (!subscription.getConsumerInfo().isNetworkSubscription()) {
837                hasRegularConsumers = true;
838                break;
839            }
840        }
841        return hasRegularConsumers;
842    }
843
844    public ConnectionContext createConnectionContext() {
845        ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
846        answer.setBroker(this.broker);
847        answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
848        answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
849        return answer;
850    }
851
852    protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) {
853        // the original ack may be a ranged ack, but we are trying to delete
854        // a specific
855        // message store here so we need to convert to a non ranged ack.
856        if (ack.getMessageCount() > 0) {
857            // Dup the ack
858            MessageAck a = new MessageAck();
859            ack.copy(a);
860            ack = a;
861            // Convert to non-ranged.
862            ack.setMessageCount(1);
863        }
864        // always use node messageId so we can access entry/data Location
865        ack.setFirstMessageId(node.getMessageId());
866        ack.setLastMessageId(node.getMessageId());
867        return ack;
868    }
869
870    protected boolean isDLQ() {
871        return destination.isDLQ();
872    }
873
874    @Override
875    public void duplicateFromStore(Message message, Subscription durableSub) {
876        ConnectionContext connectionContext = createConnectionContext();
877        getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId());
878        Throwable cause = new Throwable("duplicate from store for " + destination);
879        message.setRegionDestination(this);
880        broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
881        MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1);
882        messageAck.setPoisonCause(cause);
883        try {
884            acknowledge(connectionContext, durableSub, messageAck, message);
885        } catch (IOException e) {
886            getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), destination, messageAck);
887        }
888    }
889
890    public void setPersistJMSRedelivered(boolean persistJMSRedelivered) {
891        this.persistJMSRedelivered = persistJMSRedelivered;
892    }
893
894    public boolean isPersistJMSRedelivered() {
895        return persistJMSRedelivered;
896    }
897
898    public SystemUsage getSystemUsage() {
899        return systemUsage;
900    }
901}