001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.Serializable; 020 021import org.slf4j.Logger; 022import org.slf4j.LoggerFactory; 023 024/** 025 * Defines the prefetch message policies for different types of consumers 026 * 027 * @org.apache.xbean.XBean element="prefetchPolicy" 028 * 029 */ 030@SuppressWarnings("serial") 031public class ActiveMQPrefetchPolicy extends Object implements Serializable { 032 033 public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE; 034 public static final int DEFAULT_QUEUE_PREFETCH = 1000; 035 public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500; 036 public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100; 037 public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH = 1000; 038 public static final int DEFAULT_INPUT_STREAM_PREFETCH = 100; 039 public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE; 040 041 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQPrefetchPolicy.class); 042 043 private int queuePrefetch; 044 private int queueBrowserPrefetch; 045 private int topicPrefetch; 046 private int durableTopicPrefetch; 047 private int optimizeDurableTopicPrefetch; 048 private int inputStreamPrefetch; 049 private int maximumPendingMessageLimit; 050 051 /** 052 * Initialize default prefetch policies 053 */ 054 public ActiveMQPrefetchPolicy() { 055 this.queuePrefetch = DEFAULT_QUEUE_PREFETCH; 056 this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH; 057 this.topicPrefetch = DEFAULT_TOPIC_PREFETCH; 058 this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH; 059 this.optimizeDurableTopicPrefetch = DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH; 060 this.inputStreamPrefetch = DEFAULT_INPUT_STREAM_PREFETCH; 061 } 062 063 /** 064 * @return Returns the durableTopicPrefetch. 065 */ 066 public int getDurableTopicPrefetch() { 067 return durableTopicPrefetch; 068 } 069 070 /** 071 * @param durableTopicPrefetch 072 * The durableTopicPrefetch to set. 073 */ 074 public void setDurableTopicPrefetch(int durableTopicPrefetch) { 075 this.durableTopicPrefetch = getMaxPrefetchLimit(durableTopicPrefetch); 076 } 077 078 /** 079 * @return Returns the queuePrefetch. 080 */ 081 public int getQueuePrefetch() { 082 return queuePrefetch; 083 } 084 085 /** 086 * @param queuePrefetch 087 * The queuePrefetch to set. 088 */ 089 public void setQueuePrefetch(int queuePrefetch) { 090 this.queuePrefetch = getMaxPrefetchLimit(queuePrefetch); 091 } 092 093 /** 094 * @return Returns the queueBrowserPrefetch. 095 */ 096 public int getQueueBrowserPrefetch() { 097 return queueBrowserPrefetch; 098 } 099 100 /** 101 * @param queueBrowserPrefetch 102 * The queueBrowserPrefetch to set. 103 */ 104 public void setQueueBrowserPrefetch(int queueBrowserPrefetch) { 105 this.queueBrowserPrefetch = getMaxPrefetchLimit(queueBrowserPrefetch); 106 } 107 108 /** 109 * @return Returns the topicPrefetch. 110 */ 111 public int getTopicPrefetch() { 112 return topicPrefetch; 113 } 114 115 /** 116 * @param topicPrefetch 117 * The topicPrefetch to set. 118 */ 119 public void setTopicPrefetch(int topicPrefetch) { 120 this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch); 121 } 122 123 /** 124 * @return Returns the optimizeDurableTopicPrefetch. 125 */ 126 public int getOptimizeDurableTopicPrefetch() { 127 return optimizeDurableTopicPrefetch; 128 } 129 130 /** 131 * @param optimizeAcknowledgePrefetch 132 * The optimizeDurableTopicPrefetch to set. 133 */ 134 public void setOptimizeDurableTopicPrefetch(int optimizeAcknowledgePrefetch) { 135 this.optimizeDurableTopicPrefetch = optimizeAcknowledgePrefetch; 136 } 137 138 public int getMaximumPendingMessageLimit() { 139 return maximumPendingMessageLimit; 140 } 141 142 /** 143 * Sets how many messages a broker will keep around, above the prefetch 144 * limit, for non-durable topics before starting to discard older messages. 145 */ 146 public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) { 147 this.maximumPendingMessageLimit = maximumPendingMessageLimit; 148 } 149 150 private int getMaxPrefetchLimit(int value) { 151 int result = Math.min(value, MAX_PREFETCH_SIZE); 152 if (result < value) { 153 LOG.warn("maximum prefetch limit has been reset from " + value + " to " + MAX_PREFETCH_SIZE); 154 } 155 return result; 156 } 157 158 public void setAll(int i) { 159 this.durableTopicPrefetch = getMaxPrefetchLimit(i); 160 this.queueBrowserPrefetch = getMaxPrefetchLimit(i); 161 this.queuePrefetch = getMaxPrefetchLimit(i); 162 this.topicPrefetch = getMaxPrefetchLimit(i); 163 this.inputStreamPrefetch = getMaxPrefetchLimit(i); 164 this.optimizeDurableTopicPrefetch = getMaxPrefetchLimit(i); 165 } 166 167 @Deprecated 168 public int getInputStreamPrefetch() { 169 return inputStreamPrefetch; 170 } 171 172 @Deprecated 173 public void setInputStreamPrefetch(int inputStreamPrefetch) { 174 this.inputStreamPrefetch = getMaxPrefetchLimit(inputStreamPrefetch); 175 } 176 177 @Override 178 public boolean equals(Object object) { 179 if (object instanceof ActiveMQPrefetchPolicy) { 180 ActiveMQPrefetchPolicy other = (ActiveMQPrefetchPolicy) object; 181 return this.queuePrefetch == other.queuePrefetch && 182 this.queueBrowserPrefetch == other.queueBrowserPrefetch && 183 this.topicPrefetch == other.topicPrefetch && 184 this.durableTopicPrefetch == other.durableTopicPrefetch && 185 this.optimizeDurableTopicPrefetch == other.optimizeDurableTopicPrefetch && 186 this.inputStreamPrefetch == other.inputStreamPrefetch; 187 } 188 return false; 189 } 190}