001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.usage;
018
019import java.util.LinkedList;
020import java.util.List;
021import java.util.concurrent.CopyOnWriteArrayList;
022import java.util.concurrent.ThreadPoolExecutor;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.concurrent.locks.Condition;
026import java.util.concurrent.locks.ReentrantReadWriteLock;
027
028import org.apache.activemq.Service;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * Used to keep track of how much of something is being used so that a productive working set usage can be controlled.
034 * Main use case is manage memory usage.
035 *
036 * @org.apache.xbean.XBean
037 *
038 */
039public abstract class Usage<T extends Usage> implements Service {
040
041    private static final Logger LOG = LoggerFactory.getLogger(Usage.class);
042
043    protected final ReentrantReadWriteLock usageLock = new ReentrantReadWriteLock();
044    protected final Condition waitForSpaceCondition = usageLock.writeLock().newCondition();
045    protected int percentUsage;
046    protected T parent;
047    protected String name;
048
049    private UsageCapacity limiter = new DefaultUsageCapacity();
050    private int percentUsageMinDelta = 1;
051    private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>();
052    private final boolean debug = LOG.isDebugEnabled();
053    private float usagePortion = 1.0f;
054    private final List<T> children = new CopyOnWriteArrayList<T>();
055    private final List<Runnable> callbacks = new LinkedList<Runnable>();
056    private int pollingTime = 100;
057    private final AtomicBoolean started = new AtomicBoolean();
058    private ThreadPoolExecutor executor;
059
060    public Usage(T parent, String name, float portion) {
061        this.parent = parent;
062        this.usagePortion = portion;
063        if (parent != null) {
064            this.limiter.setLimit((long) (parent.getLimit() * (double)portion));
065            name = parent.name + ":" + name;
066        }
067        this.name = name;
068    }
069
070    protected abstract long retrieveUsage();
071
072    /**
073     * @throws InterruptedException
074     */
075    public void waitForSpace() throws InterruptedException {
076        waitForSpace(0);
077    }
078
079    public boolean waitForSpace(long timeout) throws InterruptedException {
080        return waitForSpace(timeout, 100);
081    }
082
083    /**
084     * @param timeout
085     * @throws InterruptedException
086     * @return true if space
087     */
088    public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException {
089        if (parent != null) {
090            if (!parent.waitForSpace(timeout, highWaterMark)) {
091                return false;
092            }
093        }
094        usageLock.writeLock().lock();
095        try {
096            percentUsage = caclPercentUsage();
097            if (percentUsage >= highWaterMark) {
098                long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
099                long timeleft = deadline;
100                while (timeleft > 0) {
101                    percentUsage = caclPercentUsage();
102                    if (percentUsage >= highWaterMark) {
103                        waitForSpaceCondition.await(pollingTime, TimeUnit.MILLISECONDS);
104                        timeleft = deadline - System.currentTimeMillis();
105                    } else {
106                        break;
107                    }
108                }
109            }
110            return percentUsage < highWaterMark;
111        } finally {
112            usageLock.writeLock().unlock();
113        }
114    }
115
116    public boolean isFull() {
117        return isFull(100);
118    }
119
120    public boolean isFull(int highWaterMark) {
121        if (parent != null && parent.isFull(highWaterMark)) {
122            return true;
123        }
124        usageLock.writeLock().lock();
125        try {
126            percentUsage = caclPercentUsage();
127            return percentUsage >= highWaterMark;
128        } finally {
129            usageLock.writeLock().unlock();
130        }
131    }
132
133    public void addUsageListener(UsageListener listener) {
134        listeners.add(listener);
135    }
136
137    public void removeUsageListener(UsageListener listener) {
138        listeners.remove(listener);
139    }
140
141    public long getLimit() {
142        usageLock.readLock().lock();
143        try {
144            return limiter.getLimit();
145        } finally {
146            usageLock.readLock().unlock();
147        }
148    }
149
150    /**
151     * Sets the memory limit in bytes. Setting the limit in bytes will set the usagePortion to 0 since the UsageManager
152     * is not going to be portion based off the parent. When set using Xbean, values of the form "20 Mb", "1024kb", and
153     * "1g" can be used
154     *
155     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
156     */
157    public void setLimit(long limit) {
158        if (percentUsageMinDelta < 0) {
159            throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
160        }
161        usageLock.writeLock().lock();
162        try {
163            this.limiter.setLimit(limit);
164            this.usagePortion = 0;
165        } finally {
166            usageLock.writeLock().unlock();
167        }
168        onLimitChange();
169    }
170
171    protected void onLimitChange() {
172        // We may need to calculate the limit
173        if (usagePortion > 0 && parent != null) {
174            usageLock.writeLock().lock();
175            try {
176                this.limiter.setLimit((long) (parent.getLimit() * (double) usagePortion));
177            } finally {
178                usageLock.writeLock().unlock();
179            }
180        }
181        // Reset the percent currently being used.
182        usageLock.writeLock().lock();
183        try {
184            setPercentUsage(caclPercentUsage());
185        } finally {
186            usageLock.writeLock().unlock();
187        }
188        // Let the children know that the limit has changed. They may need to
189        // set their limits based on ours.
190        for (T child : children) {
191            child.onLimitChange();
192        }
193    }
194
195    public float getUsagePortion() {
196        usageLock.readLock().lock();
197        try {
198            return usagePortion;
199        } finally {
200            usageLock.readLock().unlock();
201        }
202    }
203
204    public void setUsagePortion(float usagePortion) {
205        usageLock.writeLock().lock();
206        try {
207            this.usagePortion = usagePortion;
208        } finally {
209            usageLock.writeLock().unlock();
210        }
211        onLimitChange();
212    }
213
214    public int getPercentUsage() {
215        usageLock.readLock().lock();
216        try {
217            return percentUsage;
218        } finally {
219            usageLock.readLock().unlock();
220        }
221    }
222
223    public int getPercentUsageMinDelta() {
224        usageLock.readLock().lock();
225        try {
226            return percentUsageMinDelta;
227        } finally {
228            usageLock.readLock().unlock();
229        }
230    }
231
232    /**
233     * Sets the minimum number of percentage points the usage has to change before a UsageListener event is fired by the
234     * manager.
235     *
236     * @param percentUsageMinDelta
237     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
238     */
239    public void setPercentUsageMinDelta(int percentUsageMinDelta) {
240        if (percentUsageMinDelta < 1) {
241            throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
242        }
243
244        usageLock.writeLock().lock();
245        try {
246            this.percentUsageMinDelta = percentUsageMinDelta;
247            setPercentUsage(caclPercentUsage());
248        } finally {
249            usageLock.writeLock().unlock();
250        }
251    }
252
253    public long getUsage() {
254        usageLock.readLock().lock();
255        try {
256            return retrieveUsage();
257        } finally {
258            usageLock.readLock().unlock();
259        }
260    }
261
262    protected void setPercentUsage(int value) {
263        usageLock.writeLock().lock();
264        try {
265            int oldValue = percentUsage;
266            percentUsage = value;
267            if (oldValue != value) {
268                fireEvent(oldValue, value);
269            }
270        } finally {
271            usageLock.writeLock().unlock();
272        }
273    }
274
275    protected int caclPercentUsage() {
276        if (limiter.getLimit() == 0) {
277            return 0;
278        }
279        return (int) ((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
280    }
281
282    // Must be called with the usage lock's writeLock held.
283    private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
284        if (debug) {
285            LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: " + newPercentUsage + "% of available memory");
286        }
287        if (started.get()) {
288            // Switching from being full to not being full..
289            if (oldPercentUsage >= 100 && newPercentUsage < 100) {
290                waitForSpaceCondition.signalAll();
291                if (!callbacks.isEmpty()) {
292                    for (Runnable callback : callbacks) {
293                        getExecutor().execute(callback);
294                    }
295                    callbacks.clear();
296                }
297            }
298            if (!listeners.isEmpty()) {
299                // Let the listeners know on a separate thread
300                Runnable listenerNotifier = new Runnable() {
301                    @Override
302                    public void run() {
303                        for (UsageListener listener : listeners) {
304                            listener.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
305                        }
306                    }
307                };
308                if (started.get()) {
309                    getExecutor().execute(listenerNotifier);
310                } else {
311                    LOG.warn("Not notifying memory usage change to listeners on shutdown");
312                }
313            }
314        }
315    }
316
317    public String getName() {
318        return name;
319    }
320
321    @Override
322    public String toString() {
323        return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit()
324            + ", percentUsageMinDelta=" + percentUsageMinDelta + "%" + (parent != null ? ";Parent:" + parent.toString() : "");
325    }
326
327    @Override
328    @SuppressWarnings("unchecked")
329    public void start() {
330        if (started.compareAndSet(false, true)) {
331            if (parent != null) {
332                parent.addChild(this);
333                if (getLimit() > parent.getLimit()) {
334                    LOG.info("Usage({}) limit={} should be smaller than its parent limit={}", new Object[] { getName(), getLimit(), parent.getLimit() });
335                }
336            }
337            for (T t : children) {
338                t.start();
339            }
340        }
341    }
342
343    @Override
344    @SuppressWarnings("unchecked")
345    public void stop() {
346        if (started.compareAndSet(true, false)) {
347            if (parent != null) {
348                parent.removeChild(this);
349            }
350
351            // clear down any callbacks
352            usageLock.writeLock().lock();
353            try {
354                waitForSpaceCondition.signalAll();
355                for (Runnable callback : this.callbacks) {
356                    callback.run();
357                }
358                this.callbacks.clear();
359            } finally {
360                usageLock.writeLock().unlock();
361            }
362
363            for (T t : children) {
364                t.stop();
365            }
366        }
367    }
368
369    protected void addChild(T child) {
370        children.add(child);
371        if (started.get()) {
372            child.start();
373        }
374    }
375
376    protected void removeChild(T child) {
377        children.remove(child);
378    }
379
380    /**
381     * @param callback
382     * @return true if the UsageManager was full. The callback will only be called if this method returns true.
383     */
384    public boolean notifyCallbackWhenNotFull(final Runnable callback) {
385        if (parent != null) {
386            Runnable r = new Runnable() {
387
388                @Override
389                public void run() {
390                    usageLock.writeLock().lock();
391                    try {
392                        if (percentUsage >= 100) {
393                            callbacks.add(callback);
394                        } else {
395                            callback.run();
396                        }
397                    } finally {
398                        usageLock.writeLock().unlock();
399                    }
400                }
401            };
402            if (parent.notifyCallbackWhenNotFull(r)) {
403                return true;
404            }
405        }
406        usageLock.writeLock().lock();
407        try {
408            if (percentUsage >= 100) {
409                callbacks.add(callback);
410                return true;
411            } else {
412                return false;
413            }
414        } finally {
415            usageLock.writeLock().unlock();
416        }
417    }
418
419    /**
420     * @return the limiter
421     */
422    public UsageCapacity getLimiter() {
423        return this.limiter;
424    }
425
426    /**
427     * @param limiter
428     *            the limiter to set
429     */
430    public void setLimiter(UsageCapacity limiter) {
431        this.limiter = limiter;
432    }
433
434    /**
435     * @return the pollingTime
436     */
437    public int getPollingTime() {
438        return this.pollingTime;
439    }
440
441    /**
442     * @param pollingTime
443     *            the pollingTime to set
444     */
445    public void setPollingTime(int pollingTime) {
446        this.pollingTime = pollingTime;
447    }
448
449    public void setName(String name) {
450        this.name = name;
451    }
452
453    public T getParent() {
454        return parent;
455    }
456
457    public void setParent(T parent) {
458        this.parent = parent;
459    }
460
461    public void setExecutor(ThreadPoolExecutor executor) {
462        this.executor = executor;
463    }
464
465    public ThreadPoolExecutor getExecutor() {
466        return executor;
467    }
468
469    public boolean isStarted() {
470        return started.get();
471    }
472}