001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.policy;
018
019import java.util.Set;
020
021import org.apache.activemq.ActiveMQPrefetchPolicy;
022import org.apache.activemq.broker.Broker;
023import org.apache.activemq.broker.region.BaseDestination;
024import org.apache.activemq.broker.region.Destination;
025import org.apache.activemq.broker.region.DurableTopicSubscription;
026import org.apache.activemq.broker.region.Queue;
027import org.apache.activemq.broker.region.QueueBrowserSubscription;
028import org.apache.activemq.broker.region.QueueSubscription;
029import org.apache.activemq.broker.region.Subscription;
030import org.apache.activemq.broker.region.Topic;
031import org.apache.activemq.broker.region.TopicSubscription;
032import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
033import org.apache.activemq.broker.region.group.GroupFactoryFinder;
034import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
035import org.apache.activemq.filter.DestinationMapEntry;
036import org.apache.activemq.network.NetworkBridgeFilterFactory;
037import org.apache.activemq.usage.SystemUsage;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * Represents an entry in a {@link PolicyMap} for assigning policies to a
043 * specific destination or a hierarchical wildcard area of destinations.
044 *
045 * @org.apache.xbean.XBean
046 *
047 */
048public class PolicyEntry extends DestinationMapEntry {
049
050    private static final Logger LOG = LoggerFactory.getLogger(PolicyEntry.class);
051    private DispatchPolicy dispatchPolicy;
052    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
053    private boolean sendAdvisoryIfNoConsumers;
054    private DeadLetterStrategy deadLetterStrategy = Destination.DEFAULT_DEAD_LETTER_STRATEGY;
055    private PendingMessageLimitStrategy pendingMessageLimitStrategy;
056    private MessageEvictionStrategy messageEvictionStrategy;
057    private long memoryLimit;
058    private String messageGroupMapFactoryType = "cached";
059    private MessageGroupMapFactory messageGroupMapFactory;
060    private PendingQueueMessageStoragePolicy pendingQueuePolicy;
061    private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
062    private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
063    private int maxProducersToAudit=BaseDestination.MAX_PRODUCERS_TO_AUDIT;
064    private int maxAuditDepth=BaseDestination.MAX_AUDIT_DEPTH;
065    private int maxQueueAuditDepth=BaseDestination.MAX_AUDIT_DEPTH;
066    private boolean enableAudit=true;
067    private boolean producerFlowControl = true;
068    private boolean alwaysRetroactive = false;
069    private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
070    private boolean optimizedDispatch=false;
071    private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
072    private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE;
073    private boolean useCache=true;
074    private long minimumMessageSize=1024;
075    private boolean useConsumerPriority=true;
076    private boolean strictOrderDispatch=false;
077    private boolean lazyDispatch=false;
078    private int timeBeforeDispatchStarts = 0;
079    private int consumersBeforeDispatchStarts = 0;
080    private boolean advisoryForSlowConsumers;
081    private boolean advisoryForFastProducers;
082    private boolean advisoryForDiscardingMessages;
083    private boolean advisoryWhenFull;
084    private boolean advisoryForDelivery;
085    private boolean advisoryForConsumed;
086    private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD;
087    private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE;
088    private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
089    private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH;
090    private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
091    private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
092    private boolean usePrefetchExtension = true;
093    private int cursorMemoryHighWaterMark = 70;
094    private int storeUsageHighWaterMark = 100;
095    private SlowConsumerStrategy slowConsumerStrategy;
096    private boolean prioritizedMessages;
097    private boolean allConsumersExclusiveByDefault;
098    private boolean gcInactiveDestinations;
099    private boolean gcWithNetworkConsumers;
100    private long inactiveTimeoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
101    private boolean reduceMemoryFootprint;
102    private NetworkBridgeFilterFactory networkBridgeFilterFactory;
103    private boolean doOptimzeMessageStorage = true;
104    /*
105     * percentage of in-flight messages above which optimize message store is disabled
106     */
107    private int optimizeMessageStoreInFlightLimit = 10;
108    private boolean persistJMSRedelivered = false;
109    private int sendFailIfNoSpace = -1;
110    private long sendFailIfNoSpaceAfterTimeout = -1;
111
112
113    public void configure(Broker broker,Queue queue) {
114        baseConfiguration(broker,queue);
115        if (dispatchPolicy != null) {
116            queue.setDispatchPolicy(dispatchPolicy);
117        }
118        queue.setDeadLetterStrategy(getDeadLetterStrategy());
119        queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
120        if (memoryLimit > 0) {
121            queue.getMemoryUsage().setLimit(memoryLimit);
122        }
123        if (pendingQueuePolicy != null) {
124            PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue);
125            queue.setMessages(messages);
126        }
127
128        queue.setUseConsumerPriority(isUseConsumerPriority());
129        queue.setStrictOrderDispatch(isStrictOrderDispatch());
130        queue.setOptimizedDispatch(isOptimizedDispatch());
131        queue.setLazyDispatch(isLazyDispatch());
132        queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
133        queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
134        queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
135        queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
136    }
137
138    public void update(Queue queue) {
139        update(queue, null);
140    }
141
142    /**
143     * Update a queue with this policy.  Only apply properties that
144     * match the includedProperties list.  Not all properties are eligible
145     * to be updated.
146     *
147     * If includedProperties is null then all of the properties will be set as
148     * isUpdate will return true
149     * @param baseDestination
150     * @param includedProperties
151     */
152    public void update(Queue queue, Set<String> includedProperties) {
153        baseUpdate(queue, includedProperties);
154        if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) {
155            queue.getMemoryUsage().setLimit(memoryLimit);
156        }
157        if (isUpdate("useConsumerPriority", includedProperties)) {
158            queue.setUseConsumerPriority(isUseConsumerPriority());
159        }
160        if (isUpdate("strictOrderDispatch", includedProperties)) {
161            queue.setStrictOrderDispatch(isStrictOrderDispatch());
162        }
163        if (isUpdate("optimizedDispatch", includedProperties)) {
164            queue.setOptimizedDispatch(isOptimizedDispatch());
165        }
166        if (isUpdate("lazyDispatch", includedProperties)) {
167            queue.setLazyDispatch(isLazyDispatch());
168        }
169        if (isUpdate("timeBeforeDispatchStarts", includedProperties)) {
170            queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
171        }
172        if (isUpdate("consumersBeforeDispatchStarts", includedProperties)) {
173            queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
174        }
175        if (isUpdate("allConsumersExclusiveByDefault", includedProperties)) {
176            queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
177        }
178        if (isUpdate("persistJMSRedelivered", includedProperties)) {
179            queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
180        }
181    }
182
183    public void configure(Broker broker,Topic topic) {
184        baseConfiguration(broker,topic);
185        if (dispatchPolicy != null) {
186            topic.setDispatchPolicy(dispatchPolicy);
187        }
188        topic.setDeadLetterStrategy(getDeadLetterStrategy());
189        if (subscriptionRecoveryPolicy != null) {
190            SubscriptionRecoveryPolicy srp = subscriptionRecoveryPolicy.copy();
191            srp.setBroker(broker);
192            topic.setSubscriptionRecoveryPolicy(srp);
193        }
194        if (memoryLimit > 0) {
195            topic.getMemoryUsage().setLimit(memoryLimit);
196        }
197        topic.setLazyDispatch(isLazyDispatch());
198    }
199
200    public void update(Topic topic) {
201        update(topic, null);
202    }
203
204    //If includedProperties is null then all of the properties will be set as
205    //isUpdate will return true
206    public void update(Topic topic, Set<String> includedProperties) {
207        baseUpdate(topic, includedProperties);
208        if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) {
209            topic.getMemoryUsage().setLimit(memoryLimit);
210        }
211        if (isUpdate("lazyDispatch", includedProperties)) {
212            topic.setLazyDispatch(isLazyDispatch());
213        }
214    }
215
216    // attributes that can change on the fly
217    public void baseUpdate(BaseDestination destination) {
218        baseUpdate(destination, null);
219    }
220
221    // attributes that can change on the fly
222    //If includedProperties is null then all of the properties will be set as
223    //isUpdate will return true
224    public void baseUpdate(BaseDestination destination, Set<String> includedProperties) {
225        if (isUpdate("producerFlowControl", includedProperties)) {
226            destination.setProducerFlowControl(isProducerFlowControl());
227        }
228        if (isUpdate("alwaysRetroactive", includedProperties)) {
229            destination.setAlwaysRetroactive(isAlwaysRetroactive());
230        }
231        if (isUpdate("blockedProducerWarningInterval", includedProperties)) {
232            destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
233        }
234        if (isUpdate("maxPageSize", includedProperties)) {
235            destination.setMaxPageSize(getMaxPageSize());
236        }
237        if (isUpdate("maxBrowsePageSize", includedProperties)) {
238            destination.setMaxBrowsePageSize(getMaxBrowsePageSize());
239        }
240
241        if (isUpdate("minimumMessageSize", includedProperties)) {
242            destination.setMinimumMessageSize((int) getMinimumMessageSize());
243        }
244        if (isUpdate("maxExpirePageSize", includedProperties)) {
245            destination.setMaxExpirePageSize(getMaxExpirePageSize());
246        }
247        if (isUpdate("cursorMemoryHighWaterMark", includedProperties)) {
248            destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
249        }
250        if (isUpdate("storeUsageHighWaterMark", includedProperties)) {
251            destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
252        }
253        if (isUpdate("gcInactiveDestinations", includedProperties)) {
254            destination.setGcIfInactive(isGcInactiveDestinations());
255        }
256        if (isUpdate("gcWithNetworkConsumers", includedProperties)) {
257            destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
258        }
259        if (isUpdate("inactiveTimeoutBeforeGc", includedProperties)) {
260            destination.setInactiveTimeoutBeforeGC(getInactiveTimeoutBeforeGC());
261        }
262        if (isUpdate("reduceMemoryFootprint", includedProperties)) {
263            destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
264        }
265        if (isUpdate("doOptimizeMessageStore", includedProperties)) {
266            destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
267        }
268        if (isUpdate("optimizeMessageStoreInFlightLimit", includedProperties)) {
269            destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
270        }
271        if (isUpdate("advisoryForConsumed", includedProperties)) {
272            destination.setAdvisoryForConsumed(isAdvisoryForConsumed());
273        }
274        if (isUpdate("advisoryForDelivery", includedProperties)) {
275            destination.setAdvisoryForDelivery(isAdvisoryForDelivery());
276        }
277        if (isUpdate("advisoryForDiscardingMessages", includedProperties)) {
278            destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages());
279        }
280        if (isUpdate("advisoryForSlowConsumers", includedProperties)) {
281            destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
282        }
283        if (isUpdate("advisoryForFastProducers", includedProperties)) {
284            destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
285        }
286        if (isUpdate("advisoryWhenFull", includedProperties)) {
287            destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
288        }
289        if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) {
290            destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
291        }
292    }
293
294    public void baseConfiguration(Broker broker, BaseDestination destination) {
295        baseUpdate(destination);
296        destination.setEnableAudit(isEnableAudit());
297        destination.setMaxAuditDepth(getMaxQueueAuditDepth());
298        destination.setMaxProducersToAudit(getMaxProducersToAudit());
299        destination.setUseCache(isUseCache());
300        destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
301        SlowConsumerStrategy scs = getSlowConsumerStrategy();
302        if (scs != null) {
303            scs.setBrokerService(broker);
304            scs.addDestination(destination);
305        }
306        destination.setSlowConsumerStrategy(scs);
307        destination.setPrioritizedMessages(isPrioritizedMessages());
308        if (sendFailIfNoSpace != -1) {
309            destination.getSystemUsage().setSendFailIfNoSpace(isSendFailIfNoSpace());
310        }
311        if (sendFailIfNoSpaceAfterTimeout != -1) {
312            destination.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(getSendFailIfNoSpaceAfterTimeout());
313        }
314    }
315
316    public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
317        configurePrefetch(subscription);
318        subscription.setUsePrefetchExtension(isUsePrefetchExtension());
319        subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
320        if (pendingMessageLimitStrategy != null) {
321            int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
322            int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
323            if (consumerLimit > 0) {
324                if (value < 0 || consumerLimit < value) {
325                    value = consumerLimit;
326                }
327            }
328            if (value >= 0) {
329                LOG.debug("Setting the maximumPendingMessages size to: {} for consumer: {}", value, subscription.getInfo().getConsumerId());
330                subscription.setMaximumPendingMessages(value);
331            }
332        }
333        if (messageEvictionStrategy != null) {
334            subscription.setMessageEvictionStrategy(messageEvictionStrategy);
335        }
336        if (pendingSubscriberPolicy != null) {
337            String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
338            int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
339            subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize,subscription));
340        }
341        if (enableAudit) {
342            subscription.setEnableAudit(enableAudit);
343            subscription.setMaxProducersToAudit(maxProducersToAudit);
344            subscription.setMaxAuditDepth(maxAuditDepth);
345        }
346    }
347
348    public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) {
349        String clientId = sub.getSubscriptionKey().getClientId();
350        String subName = sub.getSubscriptionKey().getSubscriptionName();
351        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
352        configurePrefetch(sub);
353        if (pendingDurableSubscriberPolicy != null) {
354            PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,sub.getPrefetchSize(),sub);
355            cursor.setSystemUsage(memoryManager);
356            sub.setPending(cursor);
357        }
358        int auditDepth = getMaxAuditDepth();
359        if (auditDepth == BaseDestination.MAX_AUDIT_DEPTH && this.isPrioritizedMessages()) {
360            sub.setMaxAuditDepth(auditDepth * 10);
361        } else {
362            sub.setMaxAuditDepth(auditDepth);
363        }
364        sub.setMaxProducersToAudit(getMaxProducersToAudit());
365        sub.setUsePrefetchExtension(isUsePrefetchExtension());
366    }
367
368    public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) {
369        configurePrefetch(sub);
370        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
371        sub.setUsePrefetchExtension(isUsePrefetchExtension());
372
373        // TODO
374        // We currently need an infinite audit because of the way that browser dispatch
375        // is done.  We should refactor the browsers to better handle message dispatch so
376        // we can remove this and perform a more efficient dispatch.
377        sub.setMaxProducersToAudit(Integer.MAX_VALUE);
378        sub.setMaxAuditDepth(Short.MAX_VALUE);
379
380        // part solution - dispatching to browsers needs to be restricted
381        sub.setMaxMessages(getMaxBrowsePageSize());
382    }
383
384    public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) {
385        configurePrefetch(sub);
386        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
387        sub.setUsePrefetchExtension(isUsePrefetchExtension());
388        sub.setMaxProducersToAudit(getMaxProducersToAudit());
389    }
390
391    public void configurePrefetch(Subscription subscription) {
392
393        final int currentPrefetch = subscription.getConsumerInfo().getPrefetchSize();
394        if (subscription instanceof QueueBrowserSubscription) {
395            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH) {
396                ((QueueBrowserSubscription) subscription).setPrefetchSize(getQueueBrowserPrefetch());
397            }
398        } else if (subscription instanceof QueueSubscription) {
399            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH) {
400                ((QueueSubscription) subscription).setPrefetchSize(getQueuePrefetch());
401            }
402        } else if (subscription instanceof DurableTopicSubscription) {
403            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH ||
404                    subscription.getConsumerInfo().getPrefetchSize() == ActiveMQPrefetchPolicy.DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH) {
405                ((DurableTopicSubscription)subscription).setPrefetchSize(getDurableTopicPrefetch());
406            }
407        } else if (subscription instanceof TopicSubscription) {
408            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH) {
409                ((TopicSubscription) subscription).setPrefetchSize(getTopicPrefetch());
410            }
411        }
412        if (currentPrefetch != 0 && subscription.getPrefetchSize() == 0) {
413            // tell the sub so that it can issue a pull request
414            subscription.updateConsumerPrefetch(0);
415        }
416    }
417
418    private boolean isUpdate(String property, Set<String> includedProperties) {
419        return includedProperties == null || includedProperties.contains(property);
420    }
421    // Properties
422    // -------------------------------------------------------------------------
423    public DispatchPolicy getDispatchPolicy() {
424        return dispatchPolicy;
425    }
426
427    public void setDispatchPolicy(DispatchPolicy policy) {
428        this.dispatchPolicy = policy;
429    }
430
431    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
432        return subscriptionRecoveryPolicy;
433    }
434
435    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
436        this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
437    }
438
439    public boolean isSendAdvisoryIfNoConsumers() {
440        return sendAdvisoryIfNoConsumers;
441    }
442
443    /**
444     * Sends an advisory message if a non-persistent message is sent and there
445     * are no active consumers
446     */
447    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
448        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
449    }
450
451    public DeadLetterStrategy getDeadLetterStrategy() {
452        return deadLetterStrategy;
453    }
454
455    /**
456     * Sets the policy used to determine which dead letter queue destination
457     * should be used
458     */
459    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
460        this.deadLetterStrategy = deadLetterStrategy;
461    }
462
463    public PendingMessageLimitStrategy getPendingMessageLimitStrategy() {
464        return pendingMessageLimitStrategy;
465    }
466
467    /**
468     * Sets the strategy to calculate the maximum number of messages that are
469     * allowed to be pending on consumers (in addition to their prefetch sizes).
470     * Once the limit is reached, non-durable topics can then start discarding
471     * old messages. This allows us to keep dispatching messages to slow
472     * consumers while not blocking fast consumers and discarding the messages
473     * oldest first.
474     */
475    public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy) {
476        this.pendingMessageLimitStrategy = pendingMessageLimitStrategy;
477    }
478
479    public MessageEvictionStrategy getMessageEvictionStrategy() {
480        return messageEvictionStrategy;
481    }
482
483    /**
484     * Sets the eviction strategy used to decide which message to evict when the
485     * slow consumer needs to discard messages
486     */
487    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
488        this.messageEvictionStrategy = messageEvictionStrategy;
489    }
490
491    public long getMemoryLimit() {
492        return memoryLimit;
493    }
494
495    /**
496     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
497     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
498     */
499    public void setMemoryLimit(long memoryLimit) {
500        this.memoryLimit = memoryLimit;
501    }
502
503    public MessageGroupMapFactory getMessageGroupMapFactory() {
504        if (messageGroupMapFactory == null) {
505            try {
506            messageGroupMapFactory = GroupFactoryFinder.createMessageGroupMapFactory(getMessageGroupMapFactoryType());
507            }catch(Exception e){
508                LOG.error("Failed to create message group Factory ",e);
509            }
510        }
511        return messageGroupMapFactory;
512    }
513
514    /**
515     * Sets the factory used to create new instances of {MessageGroupMap} used
516     * to implement the <a
517     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
518     * functionality.
519     */
520    public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
521        this.messageGroupMapFactory = messageGroupMapFactory;
522    }
523
524
525    public String getMessageGroupMapFactoryType() {
526        return messageGroupMapFactoryType;
527    }
528
529    public void setMessageGroupMapFactoryType(String messageGroupMapFactoryType) {
530        this.messageGroupMapFactoryType = messageGroupMapFactoryType;
531    }
532
533
534    /**
535     * @return the pendingDurableSubscriberPolicy
536     */
537    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
538        return this.pendingDurableSubscriberPolicy;
539    }
540
541    /**
542     * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy
543     *                to set
544     */
545    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
546        this.pendingDurableSubscriberPolicy = pendingDurableSubscriberPolicy;
547    }
548
549    /**
550     * @return the pendingQueuePolicy
551     */
552    public PendingQueueMessageStoragePolicy getPendingQueuePolicy() {
553        return this.pendingQueuePolicy;
554    }
555
556    /**
557     * @param pendingQueuePolicy the pendingQueuePolicy to set
558     */
559    public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy pendingQueuePolicy) {
560        this.pendingQueuePolicy = pendingQueuePolicy;
561    }
562
563    /**
564     * @return the pendingSubscriberPolicy
565     */
566    public PendingSubscriberMessageStoragePolicy getPendingSubscriberPolicy() {
567        return this.pendingSubscriberPolicy;
568    }
569
570    /**
571     * @param pendingSubscriberPolicy the pendingSubscriberPolicy to set
572     */
573    public void setPendingSubscriberPolicy(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) {
574        this.pendingSubscriberPolicy = pendingSubscriberPolicy;
575    }
576
577    /**
578     * @return true if producer flow control enabled
579     */
580    public boolean isProducerFlowControl() {
581        return producerFlowControl;
582    }
583
584    /**
585     * @param producerFlowControl
586     */
587    public void setProducerFlowControl(boolean producerFlowControl) {
588        this.producerFlowControl = producerFlowControl;
589    }
590
591    /**
592     * @return true if topic is always retroactive
593     */
594    public boolean isAlwaysRetroactive() {
595        return alwaysRetroactive;
596    }
597
598    /**
599     * @param alwaysRetroactive
600     */
601    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
602        this.alwaysRetroactive = alwaysRetroactive;
603    }
604
605
606    /**
607     * Set's the interval at which warnings about producers being blocked by
608     * resource usage will be triggered. Values of 0 or less will disable
609     * warnings
610     *
611     * @param blockedProducerWarningInterval the interval at which warning about
612     *            blocked producers will be triggered.
613     */
614    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
615        this.blockedProducerWarningInterval = blockedProducerWarningInterval;
616    }
617
618    /**
619     *
620     * @return the interval at which warning about blocked producers will be
621     *         triggered.
622     */
623    public long getBlockedProducerWarningInterval() {
624        return blockedProducerWarningInterval;
625    }
626
627    /**
628     * @return the maxProducersToAudit
629     */
630    public int getMaxProducersToAudit() {
631        return maxProducersToAudit;
632    }
633
634    /**
635     * @param maxProducersToAudit the maxProducersToAudit to set
636     */
637    public void setMaxProducersToAudit(int maxProducersToAudit) {
638        this.maxProducersToAudit = maxProducersToAudit;
639    }
640
641    /**
642     * @return the maxAuditDepth
643     */
644    public int getMaxAuditDepth() {
645        return maxAuditDepth;
646    }
647
648    /**
649     * @param maxAuditDepth the maxAuditDepth to set
650     */
651    public void setMaxAuditDepth(int maxAuditDepth) {
652        this.maxAuditDepth = maxAuditDepth;
653    }
654
655    /**
656     * @return the enableAudit
657     */
658    public boolean isEnableAudit() {
659        return enableAudit;
660    }
661
662    /**
663     * @param enableAudit the enableAudit to set
664     */
665    public void setEnableAudit(boolean enableAudit) {
666        this.enableAudit = enableAudit;
667    }
668
669    public int getMaxQueueAuditDepth() {
670        return maxQueueAuditDepth;
671    }
672
673    public void setMaxQueueAuditDepth(int maxQueueAuditDepth) {
674        this.maxQueueAuditDepth = maxQueueAuditDepth;
675    }
676
677    public boolean isOptimizedDispatch() {
678        return optimizedDispatch;
679    }
680
681    public void setOptimizedDispatch(boolean optimizedDispatch) {
682        this.optimizedDispatch = optimizedDispatch;
683    }
684
685    public int getMaxPageSize() {
686        return maxPageSize;
687    }
688
689    public void setMaxPageSize(int maxPageSize) {
690        this.maxPageSize = maxPageSize;
691    }
692
693    public int getMaxBrowsePageSize() {
694        return maxBrowsePageSize;
695    }
696
697    public void setMaxBrowsePageSize(int maxPageSize) {
698        this.maxBrowsePageSize = maxPageSize;
699    }
700
701    public boolean isUseCache() {
702        return useCache;
703    }
704
705    public void setUseCache(boolean useCache) {
706        this.useCache = useCache;
707    }
708
709    public long getMinimumMessageSize() {
710        return minimumMessageSize;
711    }
712
713    public void setMinimumMessageSize(long minimumMessageSize) {
714        this.minimumMessageSize = minimumMessageSize;
715    }
716
717    public boolean isUseConsumerPriority() {
718        return useConsumerPriority;
719    }
720
721    public void setUseConsumerPriority(boolean useConsumerPriority) {
722        this.useConsumerPriority = useConsumerPriority;
723    }
724
725    public boolean isStrictOrderDispatch() {
726        return strictOrderDispatch;
727    }
728
729    public void setStrictOrderDispatch(boolean strictOrderDispatch) {
730        this.strictOrderDispatch = strictOrderDispatch;
731    }
732
733    public boolean isLazyDispatch() {
734        return lazyDispatch;
735    }
736
737    public void setLazyDispatch(boolean lazyDispatch) {
738        this.lazyDispatch = lazyDispatch;
739    }
740
741    public int getTimeBeforeDispatchStarts() {
742        return timeBeforeDispatchStarts;
743    }
744
745    public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
746        this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
747    }
748
749    public int getConsumersBeforeDispatchStarts() {
750        return consumersBeforeDispatchStarts;
751    }
752
753    public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
754        this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
755    }
756
757    /**
758     * @return the advisoryForSlowConsumers
759     */
760    public boolean isAdvisoryForSlowConsumers() {
761        return advisoryForSlowConsumers;
762    }
763
764    /**
765     * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
766     */
767    public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
768        this.advisoryForSlowConsumers = advisoryForSlowConsumers;
769    }
770
771    /**
772     * @return the advisoryForDiscardingMessages
773     */
774    public boolean isAdvisoryForDiscardingMessages() {
775        return advisoryForDiscardingMessages;
776    }
777
778    /**
779     * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to set
780     */
781    public void setAdvisoryForDiscardingMessages(
782            boolean advisoryForDiscardingMessages) {
783        this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
784    }
785
786    /**
787     * @return the advisoryWhenFull
788     */
789    public boolean isAdvisoryWhenFull() {
790        return advisoryWhenFull;
791    }
792
793    /**
794     * @param advisoryWhenFull the advisoryWhenFull to set
795     */
796    public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
797        this.advisoryWhenFull = advisoryWhenFull;
798    }
799
800    /**
801     * @return the advisoryForDelivery
802     */
803    public boolean isAdvisoryForDelivery() {
804        return advisoryForDelivery;
805    }
806
807    /**
808     * @param advisoryForDelivery the advisoryForDelivery to set
809     */
810    public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
811        this.advisoryForDelivery = advisoryForDelivery;
812    }
813
814    /**
815     * @return the advisoryForConsumed
816     */
817    public boolean isAdvisoryForConsumed() {
818        return advisoryForConsumed;
819    }
820
821    /**
822     * @param advisoryForConsumed the advisoryForConsumed to set
823     */
824    public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
825        this.advisoryForConsumed = advisoryForConsumed;
826    }
827
828    /**
829     * @return the advisdoryForFastProducers
830     */
831    public boolean isAdvisoryForFastProducers() {
832        return advisoryForFastProducers;
833    }
834
835    /**
836     * @param advisoryForFastProducers the advisdoryForFastProducers to set
837     */
838    public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
839        this.advisoryForFastProducers = advisoryForFastProducers;
840    }
841
842    public void setMaxExpirePageSize(int maxExpirePageSize) {
843        this.maxExpirePageSize = maxExpirePageSize;
844    }
845
846    public int getMaxExpirePageSize() {
847        return maxExpirePageSize;
848    }
849
850    public void setExpireMessagesPeriod(long expireMessagesPeriod) {
851        this.expireMessagesPeriod = expireMessagesPeriod;
852    }
853
854    public long getExpireMessagesPeriod() {
855        return expireMessagesPeriod;
856    }
857
858    /**
859     * Get the queuePrefetch
860     * @return the queuePrefetch
861     */
862    public int getQueuePrefetch() {
863        return this.queuePrefetch;
864    }
865
866    /**
867     * Set the queuePrefetch
868     * @param queuePrefetch the queuePrefetch to set
869     */
870    public void setQueuePrefetch(int queuePrefetch) {
871        this.queuePrefetch = queuePrefetch;
872    }
873
874    /**
875     * Get the queueBrowserPrefetch
876     * @return the queueBrowserPrefetch
877     */
878    public int getQueueBrowserPrefetch() {
879        return this.queueBrowserPrefetch;
880    }
881
882    /**
883     * Set the queueBrowserPrefetch
884     * @param queueBrowserPrefetch the queueBrowserPrefetch to set
885     */
886    public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
887        this.queueBrowserPrefetch = queueBrowserPrefetch;
888    }
889
890    /**
891     * Get the topicPrefetch
892     * @return the topicPrefetch
893     */
894    public int getTopicPrefetch() {
895        return this.topicPrefetch;
896    }
897
898    /**
899     * Set the topicPrefetch
900     * @param topicPrefetch the topicPrefetch to set
901     */
902    public void setTopicPrefetch(int topicPrefetch) {
903        this.topicPrefetch = topicPrefetch;
904    }
905
906    /**
907     * Get the durableTopicPrefetch
908     * @return the durableTopicPrefetch
909     */
910    public int getDurableTopicPrefetch() {
911        return this.durableTopicPrefetch;
912    }
913
914    /**
915     * Set the durableTopicPrefetch
916     * @param durableTopicPrefetch the durableTopicPrefetch to set
917     */
918    public void setDurableTopicPrefetch(int durableTopicPrefetch) {
919        this.durableTopicPrefetch = durableTopicPrefetch;
920    }
921
922    public boolean isUsePrefetchExtension() {
923        return this.usePrefetchExtension;
924    }
925
926    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
927        this.usePrefetchExtension = usePrefetchExtension;
928    }
929
930    public int getCursorMemoryHighWaterMark() {
931        return this.cursorMemoryHighWaterMark;
932    }
933
934    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
935        this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
936    }
937
938    public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
939        this.storeUsageHighWaterMark = storeUsageHighWaterMark;
940    }
941
942    public int getStoreUsageHighWaterMark() {
943        return storeUsageHighWaterMark;
944    }
945
946    public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
947        this.slowConsumerStrategy = slowConsumerStrategy;
948    }
949
950    public SlowConsumerStrategy getSlowConsumerStrategy() {
951        return this.slowConsumerStrategy;
952    }
953
954
955    public boolean isPrioritizedMessages() {
956        return this.prioritizedMessages;
957    }
958
959    public void setPrioritizedMessages(boolean prioritizedMessages) {
960        this.prioritizedMessages = prioritizedMessages;
961    }
962
963    public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) {
964        this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault;
965    }
966
967    public boolean isAllConsumersExclusiveByDefault() {
968        return allConsumersExclusiveByDefault;
969    }
970
971    public boolean isGcInactiveDestinations() {
972        return this.gcInactiveDestinations;
973    }
974
975    public void setGcInactiveDestinations(boolean gcInactiveDestinations) {
976        this.gcInactiveDestinations = gcInactiveDestinations;
977    }
978
979    /**
980     * @return the amount of time spent inactive before GC of the destination kicks in.
981     *
982     * @deprecated use getInactiveTimeoutBeforeGC instead.
983     */
984    @Deprecated
985    public long getInactiveTimoutBeforeGC() {
986        return getInactiveTimeoutBeforeGC();
987    }
988
989    /**
990     * Sets the amount of time a destination is inactive before it is marked for GC
991     *
992     * @param inactiveTimoutBeforeGC
993     *        time in milliseconds to configure as the inactive timeout.
994     *
995     * @deprecated use getInactiveTimeoutBeforeGC instead.
996     */
997    @Deprecated
998    public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
999        setInactiveTimeoutBeforeGC(inactiveTimoutBeforeGC);
1000    }
1001
1002    /**
1003     * @return the amount of time spent inactive before GC of the destination kicks in.
1004     */
1005    public long getInactiveTimeoutBeforeGC() {
1006        return this.inactiveTimeoutBeforeGC;
1007    }
1008
1009    /**
1010     * Sets the amount of time a destination is inactive before it is marked for GC
1011     *
1012     * @param inactiveTimoutBeforeGC
1013     *        time in milliseconds to configure as the inactive timeout.
1014     */
1015    public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) {
1016        this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC;
1017    }
1018
1019    public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
1020        this.gcWithNetworkConsumers = gcWithNetworkConsumers;
1021    }
1022
1023    public boolean isGcWithNetworkConsumers() {
1024        return gcWithNetworkConsumers;
1025    }
1026
1027    public boolean isReduceMemoryFootprint() {
1028        return reduceMemoryFootprint;
1029    }
1030
1031    public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
1032        this.reduceMemoryFootprint = reduceMemoryFootprint;
1033    }
1034
1035    public void setNetworkBridgeFilterFactory(NetworkBridgeFilterFactory networkBridgeFilterFactory) {
1036        this.networkBridgeFilterFactory = networkBridgeFilterFactory;
1037    }
1038
1039    public NetworkBridgeFilterFactory getNetworkBridgeFilterFactory() {
1040        return networkBridgeFilterFactory;
1041    }
1042
1043    public boolean isDoOptimzeMessageStorage() {
1044        return doOptimzeMessageStorage;
1045    }
1046
1047    public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
1048        this.doOptimzeMessageStorage = doOptimzeMessageStorage;
1049    }
1050
1051    public int getOptimizeMessageStoreInFlightLimit() {
1052        return optimizeMessageStoreInFlightLimit;
1053    }
1054
1055    public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
1056        this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
1057    }
1058
1059    public void setPersistJMSRedelivered(boolean val) {
1060        this.persistJMSRedelivered = val;
1061    }
1062
1063    public boolean isPersistJMSRedelivered() {
1064        return persistJMSRedelivered;
1065    }
1066
1067    @Override
1068    public String toString() {
1069        return "PolicyEntry [" + destination + "]";
1070    }
1071
1072    public void setSendFailIfNoSpace(boolean val) {
1073        if (val) {
1074            this.sendFailIfNoSpace = 1;
1075        } else {
1076            this.sendFailIfNoSpace = 0;
1077        }
1078    }
1079
1080    public boolean isSendFailIfNoSpace() {
1081        return sendFailIfNoSpace == 1;
1082    }
1083
1084    public void setSendFailIfNoSpaceAfterTimeout(long sendFailIfNoSpaceAfterTimeout) {
1085        this.sendFailIfNoSpaceAfterTimeout = sendFailIfNoSpaceAfterTimeout;
1086    }
1087
1088    public long getSendFailIfNoSpaceAfterTimeout() {
1089        return this.sendFailIfNoSpaceAfterTimeout;
1090    }
1091}