001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.usage; 018 019import java.util.LinkedList; 020import java.util.List; 021import java.util.concurrent.CopyOnWriteArrayList; 022import java.util.concurrent.ThreadPoolExecutor; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.atomic.AtomicBoolean; 025import java.util.concurrent.locks.Condition; 026import java.util.concurrent.locks.ReentrantReadWriteLock; 027 028import org.apache.activemq.Service; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * Used to keep track of how much of something is being used so that a productive working set usage can be controlled. 034 * Main use case is manage memory usage. 035 * 036 * @org.apache.xbean.XBean 037 * 038 */ 039public abstract class Usage<T extends Usage> implements Service { 040 041 private static final Logger LOG = LoggerFactory.getLogger(Usage.class); 042 043 protected final ReentrantReadWriteLock usageLock = new ReentrantReadWriteLock(); 044 protected final Condition waitForSpaceCondition = usageLock.writeLock().newCondition(); 045 protected int percentUsage; 046 protected T parent; 047 protected String name; 048 049 private UsageCapacity limiter = new DefaultUsageCapacity(); 050 private int percentUsageMinDelta = 1; 051 private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>(); 052 private final boolean debug = LOG.isDebugEnabled(); 053 private float usagePortion = 1.0f; 054 private final List<T> children = new CopyOnWriteArrayList<T>(); 055 private final List<Runnable> callbacks = new LinkedList<Runnable>(); 056 private int pollingTime = 100; 057 private final AtomicBoolean started = new AtomicBoolean(); 058 private ThreadPoolExecutor executor; 059 060 public Usage(T parent, String name, float portion) { 061 this.parent = parent; 062 this.usagePortion = portion; 063 if (parent != null) { 064 this.limiter.setLimit((long) (parent.getLimit() * (double)portion)); 065 name = parent.name + ":" + name; 066 } 067 this.name = name; 068 } 069 070 protected abstract long retrieveUsage(); 071 072 /** 073 * @throws InterruptedException 074 */ 075 public void waitForSpace() throws InterruptedException { 076 waitForSpace(0); 077 } 078 079 public boolean waitForSpace(long timeout) throws InterruptedException { 080 return waitForSpace(timeout, 100); 081 } 082 083 /** 084 * @param timeout 085 * @throws InterruptedException 086 * @return true if space 087 */ 088 public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException { 089 if (parent != null) { 090 if (!parent.waitForSpace(timeout, highWaterMark)) { 091 return false; 092 } 093 } 094 usageLock.writeLock().lock(); 095 try { 096 percentUsage = caclPercentUsage(); 097 if (percentUsage >= highWaterMark) { 098 long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE; 099 long timeleft = deadline; 100 while (timeleft > 0) { 101 percentUsage = caclPercentUsage(); 102 if (percentUsage >= highWaterMark) { 103 waitForSpaceCondition.await(pollingTime, TimeUnit.MILLISECONDS); 104 timeleft = deadline - System.currentTimeMillis(); 105 } else { 106 break; 107 } 108 } 109 } 110 return percentUsage < highWaterMark; 111 } finally { 112 usageLock.writeLock().unlock(); 113 } 114 } 115 116 public boolean isFull() { 117 return isFull(100); 118 } 119 120 public boolean isFull(int highWaterMark) { 121 if (parent != null && parent.isFull(highWaterMark)) { 122 return true; 123 } 124 usageLock.writeLock().lock(); 125 try { 126 percentUsage = caclPercentUsage(); 127 return percentUsage >= highWaterMark; 128 } finally { 129 usageLock.writeLock().unlock(); 130 } 131 } 132 133 public void addUsageListener(UsageListener listener) { 134 listeners.add(listener); 135 } 136 137 public void removeUsageListener(UsageListener listener) { 138 listeners.remove(listener); 139 } 140 141 public long getLimit() { 142 usageLock.readLock().lock(); 143 try { 144 return limiter.getLimit(); 145 } finally { 146 usageLock.readLock().unlock(); 147 } 148 } 149 150 /** 151 * Sets the memory limit in bytes. Setting the limit in bytes will set the usagePortion to 0 since the UsageManager 152 * is not going to be portion based off the parent. When set using Xbean, values of the form "20 Mb", "1024kb", and 153 * "1g" can be used 154 * 155 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 156 */ 157 public void setLimit(long limit) { 158 if (percentUsageMinDelta < 0) { 159 throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0"); 160 } 161 usageLock.writeLock().lock(); 162 try { 163 this.limiter.setLimit(limit); 164 this.usagePortion = 0; 165 } finally { 166 usageLock.writeLock().unlock(); 167 } 168 onLimitChange(); 169 } 170 171 protected void onLimitChange() { 172 // We may need to calculate the limit 173 if (usagePortion > 0 && parent != null) { 174 usageLock.writeLock().lock(); 175 try { 176 this.limiter.setLimit((long) (parent.getLimit() * (double) usagePortion)); 177 } finally { 178 usageLock.writeLock().unlock(); 179 } 180 } 181 // Reset the percent currently being used. 182 usageLock.writeLock().lock(); 183 try { 184 setPercentUsage(caclPercentUsage()); 185 } finally { 186 usageLock.writeLock().unlock(); 187 } 188 // Let the children know that the limit has changed. They may need to 189 // set their limits based on ours. 190 for (T child : children) { 191 child.onLimitChange(); 192 } 193 } 194 195 public float getUsagePortion() { 196 usageLock.readLock().lock(); 197 try { 198 return usagePortion; 199 } finally { 200 usageLock.readLock().unlock(); 201 } 202 } 203 204 public void setUsagePortion(float usagePortion) { 205 usageLock.writeLock().lock(); 206 try { 207 this.usagePortion = usagePortion; 208 } finally { 209 usageLock.writeLock().unlock(); 210 } 211 onLimitChange(); 212 } 213 214 public int getPercentUsage() { 215 usageLock.readLock().lock(); 216 try { 217 return percentUsage; 218 } finally { 219 usageLock.readLock().unlock(); 220 } 221 } 222 223 public int getPercentUsageMinDelta() { 224 usageLock.readLock().lock(); 225 try { 226 return percentUsageMinDelta; 227 } finally { 228 usageLock.readLock().unlock(); 229 } 230 } 231 232 /** 233 * Sets the minimum number of percentage points the usage has to change before a UsageListener event is fired by the 234 * manager. 235 * 236 * @param percentUsageMinDelta 237 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 238 */ 239 public void setPercentUsageMinDelta(int percentUsageMinDelta) { 240 if (percentUsageMinDelta < 1) { 241 throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0"); 242 } 243 244 usageLock.writeLock().lock(); 245 try { 246 this.percentUsageMinDelta = percentUsageMinDelta; 247 setPercentUsage(caclPercentUsage()); 248 } finally { 249 usageLock.writeLock().unlock(); 250 } 251 } 252 253 public long getUsage() { 254 usageLock.readLock().lock(); 255 try { 256 return retrieveUsage(); 257 } finally { 258 usageLock.readLock().unlock(); 259 } 260 } 261 262 protected void setPercentUsage(int value) { 263 usageLock.writeLock().lock(); 264 try { 265 int oldValue = percentUsage; 266 percentUsage = value; 267 if (oldValue != value) { 268 fireEvent(oldValue, value); 269 } 270 } finally { 271 usageLock.writeLock().unlock(); 272 } 273 } 274 275 protected int caclPercentUsage() { 276 if (limiter.getLimit() == 0) { 277 return 0; 278 } 279 return (int) ((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta); 280 } 281 282 // Must be called with the usage lock's writeLock held. 283 private void fireEvent(final int oldPercentUsage, final int newPercentUsage) { 284 if (debug) { 285 LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: " + newPercentUsage + "% of available memory"); 286 } 287 if (started.get()) { 288 // Switching from being full to not being full.. 289 if (oldPercentUsage >= 100 && newPercentUsage < 100) { 290 waitForSpaceCondition.signalAll(); 291 if (!callbacks.isEmpty()) { 292 for (Runnable callback : callbacks) { 293 getExecutor().execute(callback); 294 } 295 callbacks.clear(); 296 } 297 } 298 if (!listeners.isEmpty()) { 299 // Let the listeners know on a separate thread 300 Runnable listenerNotifier = new Runnable() { 301 @Override 302 public void run() { 303 for (UsageListener listener : listeners) { 304 listener.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage); 305 } 306 } 307 }; 308 if (started.get()) { 309 getExecutor().execute(listenerNotifier); 310 } else { 311 LOG.warn("Not notifying memory usage change to listeners on shutdown"); 312 } 313 } 314 } 315 } 316 317 public String getName() { 318 return name; 319 } 320 321 @Override 322 public String toString() { 323 return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit() 324 + ", percentUsageMinDelta=" + percentUsageMinDelta + "%" + (parent != null ? ";Parent:" + parent.toString() : ""); 325 } 326 327 @Override 328 @SuppressWarnings("unchecked") 329 public void start() { 330 if (started.compareAndSet(false, true)) { 331 if (parent != null) { 332 parent.addChild(this); 333 if (getLimit() > parent.getLimit()) { 334 LOG.info("Usage({}) limit={} should be smaller than its parent limit={}", new Object[] { getName(), getLimit(), parent.getLimit() }); 335 } 336 } 337 for (T t : children) { 338 t.start(); 339 } 340 } 341 } 342 343 @Override 344 @SuppressWarnings("unchecked") 345 public void stop() { 346 if (started.compareAndSet(true, false)) { 347 if (parent != null) { 348 parent.removeChild(this); 349 } 350 351 // clear down any callbacks 352 usageLock.writeLock().lock(); 353 try { 354 waitForSpaceCondition.signalAll(); 355 for (Runnable callback : this.callbacks) { 356 callback.run(); 357 } 358 this.callbacks.clear(); 359 } finally { 360 usageLock.writeLock().unlock(); 361 } 362 363 for (T t : children) { 364 t.stop(); 365 } 366 } 367 } 368 369 protected void addChild(T child) { 370 children.add(child); 371 if (started.get()) { 372 child.start(); 373 } 374 } 375 376 protected void removeChild(T child) { 377 children.remove(child); 378 } 379 380 /** 381 * @param callback 382 * @return true if the UsageManager was full. The callback will only be called if this method returns true. 383 */ 384 public boolean notifyCallbackWhenNotFull(final Runnable callback) { 385 if (parent != null) { 386 Runnable r = new Runnable() { 387 388 @Override 389 public void run() { 390 usageLock.writeLock().lock(); 391 try { 392 if (percentUsage >= 100) { 393 callbacks.add(callback); 394 } else { 395 callback.run(); 396 } 397 } finally { 398 usageLock.writeLock().unlock(); 399 } 400 } 401 }; 402 if (parent.notifyCallbackWhenNotFull(r)) { 403 return true; 404 } 405 } 406 usageLock.writeLock().lock(); 407 try { 408 if (percentUsage >= 100) { 409 callbacks.add(callback); 410 return true; 411 } else { 412 return false; 413 } 414 } finally { 415 usageLock.writeLock().unlock(); 416 } 417 } 418 419 /** 420 * @return the limiter 421 */ 422 public UsageCapacity getLimiter() { 423 return this.limiter; 424 } 425 426 /** 427 * @param limiter 428 * the limiter to set 429 */ 430 public void setLimiter(UsageCapacity limiter) { 431 this.limiter = limiter; 432 } 433 434 /** 435 * @return the pollingTime 436 */ 437 public int getPollingTime() { 438 return this.pollingTime; 439 } 440 441 /** 442 * @param pollingTime 443 * the pollingTime to set 444 */ 445 public void setPollingTime(int pollingTime) { 446 this.pollingTime = pollingTime; 447 } 448 449 public void setName(String name) { 450 this.name = name; 451 } 452 453 public T getParent() { 454 return parent; 455 } 456 457 public void setParent(T parent) { 458 this.parent = parent; 459 } 460 461 public void setExecutor(ThreadPoolExecutor executor) { 462 this.executor = executor; 463 } 464 465 public ThreadPoolExecutor getExecutor() { 466 return executor; 467 } 468 469 public boolean isStarted() { 470 return started.get(); 471 } 472}