001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.LinkedList; 023import java.util.List; 024import java.util.concurrent.atomic.AtomicBoolean; 025import java.util.concurrent.atomic.AtomicLong; 026 027import org.apache.activemq.broker.Broker; 028import org.apache.activemq.broker.ConnectionContext; 029import org.apache.activemq.broker.region.Destination; 030import org.apache.activemq.broker.region.IndirectMessageReference; 031import org.apache.activemq.broker.region.MessageReference; 032import org.apache.activemq.broker.region.QueueMessageReference; 033import org.apache.activemq.command.Message; 034import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 035import org.apache.activemq.openwire.OpenWireFormat; 036import org.apache.activemq.store.PList; 037import org.apache.activemq.store.PListEntry; 038import org.apache.activemq.store.PListStore; 039import org.apache.activemq.usage.SystemUsage; 040import org.apache.activemq.usage.Usage; 041import org.apache.activemq.usage.UsageListener; 042import org.apache.activemq.util.ByteSequence; 043import org.apache.activemq.wireformat.WireFormat; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * persist pending messages pending message (messages awaiting dispatch to a 049 * consumer) cursor 050 */ 051public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener { 052 053 static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class); 054 055 private static final AtomicLong NAME_COUNT = new AtomicLong(); 056 057 protected Broker broker; 058 private final PListStore store; 059 private final String name; 060 private PendingList memoryList; 061 private PList diskList; 062 private Iterator<MessageReference> iter; 063 private Destination regionDestination; 064 private boolean iterating; 065 private boolean flushRequired; 066 private final AtomicBoolean started = new AtomicBoolean(); 067 private final WireFormat wireFormat = new OpenWireFormat(); 068 069 /** 070 * @param broker 071 * @param name 072 * @param prioritizedMessages 073 */ 074 public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) { 075 super(prioritizedMessages); 076 if (this.prioritizedMessages) { 077 this.memoryList = new PrioritizedPendingList(); 078 } else { 079 this.memoryList = new OrderedPendingList(); 080 } 081 this.broker = broker; 082 // the store can be null if the BrokerService has persistence 083 // turned off 084 this.store = broker.getTempDataStore(); 085 this.name = NAME_COUNT.incrementAndGet() + "_" + name; 086 } 087 088 @Override 089 public void start() throws Exception { 090 if (started.compareAndSet(false, true)) { 091 if( this.broker != null) { 092 wireFormat.setVersion(this.broker.getBrokerService().getStoreOpenWireVersion()); 093 } 094 super.start(); 095 if (systemUsage != null) { 096 systemUsage.getMemoryUsage().addUsageListener(this); 097 } 098 } 099 } 100 101 @Override 102 public void stop() throws Exception { 103 if (started.compareAndSet(true, false)) { 104 super.stop(); 105 if (systemUsage != null) { 106 systemUsage.getMemoryUsage().removeUsageListener(this); 107 } 108 } 109 } 110 111 /** 112 * @return true if there are no pending messages 113 */ 114 @Override 115 public synchronized boolean isEmpty() { 116 if (memoryList.isEmpty() && isDiskListEmpty()) { 117 return true; 118 } 119 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 120 MessageReference node = iterator.next(); 121 if (node == QueueMessageReference.NULL_MESSAGE) { 122 continue; 123 } 124 if (!node.isDropped()) { 125 return false; 126 } 127 // We can remove dropped references. 128 iterator.remove(); 129 } 130 return isDiskListEmpty(); 131 } 132 133 /** 134 * reset the cursor 135 */ 136 @Override 137 public synchronized void reset() { 138 iterating = true; 139 last = null; 140 if (isDiskListEmpty()) { 141 this.iter = this.memoryList.iterator(); 142 } else { 143 this.iter = new DiskIterator(); 144 } 145 } 146 147 @Override 148 public synchronized void release() { 149 iterating = false; 150 if (iter instanceof DiskIterator) { 151 ((DiskIterator)iter).release(); 152 }; 153 if (flushRequired) { 154 flushRequired = false; 155 if (!hasSpace()) { 156 flushToDisk(); 157 } 158 } 159 // ensure any memory ref is released 160 iter = null; 161 } 162 163 @Override 164 public synchronized void destroy() throws Exception { 165 stop(); 166 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) { 167 MessageReference node = i.next(); 168 node.decrementReferenceCount(); 169 } 170 memoryList.clear(); 171 destroyDiskList(); 172 } 173 174 private void destroyDiskList() throws Exception { 175 if (diskList != null) { 176 store.removePList(name); 177 diskList = null; 178 } 179 } 180 181 @Override 182 public synchronized LinkedList<MessageReference> pageInList(int maxItems) { 183 LinkedList<MessageReference> result = new LinkedList<MessageReference>(); 184 int count = 0; 185 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) { 186 MessageReference ref = i.next(); 187 ref.incrementReferenceCount(); 188 result.add(ref); 189 count++; 190 } 191 if (count < maxItems && !isDiskListEmpty()) { 192 for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) { 193 Message message = (Message) i.next(); 194 message.setRegionDestination(regionDestination); 195 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 196 message.incrementReferenceCount(); 197 result.add(message); 198 count++; 199 } 200 } 201 return result; 202 } 203 204 /** 205 * add message to await dispatch 206 * 207 * @param node 208 * @throws Exception 209 */ 210 @Override 211 public synchronized boolean addMessageLast(MessageReference node) throws Exception { 212 return tryAddMessageLast(node, 0); 213 } 214 215 @Override 216 public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { 217 if (!node.isExpired()) { 218 try { 219 regionDestination = (Destination) node.getMessage().getRegionDestination(); 220 if (isDiskListEmpty()) { 221 if (hasSpace() || this.store == null) { 222 memoryList.addMessageLast(node); 223 node.incrementReferenceCount(); 224 setCacheEnabled(true); 225 return true; 226 } 227 } 228 if (!hasSpace()) { 229 if (isDiskListEmpty()) { 230 expireOldMessages(); 231 if (hasSpace()) { 232 memoryList.addMessageLast(node); 233 node.incrementReferenceCount(); 234 return true; 235 } else { 236 flushToDisk(); 237 } 238 } 239 } 240 if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) { 241 ByteSequence bs = getByteSequence(node.getMessage()); 242 getDiskList().addLast(node.getMessageId().toString(), bs); 243 return true; 244 } 245 return false; 246 247 } catch (Exception e) { 248 LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e); 249 throw new RuntimeException(e); 250 } 251 } else { 252 discardExpiredMessage(node); 253 } 254 //message expired 255 return true; 256 } 257 258 /** 259 * add message to await dispatch 260 * 261 * @param node 262 */ 263 @Override 264 public synchronized void addMessageFirst(MessageReference node) { 265 if (!node.isExpired()) { 266 try { 267 regionDestination = (Destination) node.getMessage().getRegionDestination(); 268 if (isDiskListEmpty()) { 269 if (hasSpace()) { 270 memoryList.addMessageFirst(node); 271 node.incrementReferenceCount(); 272 setCacheEnabled(true); 273 return; 274 } 275 } 276 if (!hasSpace()) { 277 if (isDiskListEmpty()) { 278 expireOldMessages(); 279 if (hasSpace()) { 280 memoryList.addMessageFirst(node); 281 node.incrementReferenceCount(); 282 return; 283 } else { 284 flushToDisk(); 285 } 286 } 287 } 288 systemUsage.getTempUsage().waitForSpace(); 289 node.decrementReferenceCount(); 290 ByteSequence bs = getByteSequence(node.getMessage()); 291 Object locator = getDiskList().addFirst(node.getMessageId().toString(), bs); 292 node.getMessageId().setPlistLocator(locator); 293 294 } catch (Exception e) { 295 LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e); 296 throw new RuntimeException(e); 297 } 298 } else { 299 discardExpiredMessage(node); 300 } 301 } 302 303 /** 304 * @return true if there pending messages to dispatch 305 */ 306 @Override 307 public synchronized boolean hasNext() { 308 return iter.hasNext(); 309 } 310 311 /** 312 * @return the next pending message 313 */ 314 @Override 315 public synchronized MessageReference next() { 316 MessageReference reference = iter.next(); 317 last = reference; 318 if (!isDiskListEmpty()) { 319 // got from disk 320 reference.getMessage().setRegionDestination(regionDestination); 321 reference.getMessage().setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 322 } 323 reference.incrementReferenceCount(); 324 return reference; 325 } 326 327 /** 328 * remove the message at the cursor position 329 */ 330 @Override 331 public synchronized void remove() { 332 iter.remove(); 333 if (last != null) { 334 last.decrementReferenceCount(); 335 } 336 } 337 338 /** 339 * @param node 340 * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference) 341 */ 342 @Override 343 public synchronized void remove(MessageReference node) { 344 if (memoryList.remove(node) != null) { 345 node.decrementReferenceCount(); 346 } 347 if (!isDiskListEmpty()) { 348 try { 349 getDiskList().remove(node.getMessageId().getPlistLocator()); 350 } catch (IOException e) { 351 throw new RuntimeException(e); 352 } 353 } 354 } 355 356 /** 357 * @return the number of pending messages 358 */ 359 @Override 360 public synchronized int size() { 361 return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size()); 362 } 363 364 /** 365 * clear all pending messages 366 */ 367 @Override 368 public synchronized void clear() { 369 memoryList.clear(); 370 if (!isDiskListEmpty()) { 371 try { 372 getDiskList().destroy(); 373 } catch (IOException e) { 374 throw new RuntimeException(e); 375 } 376 } 377 last = null; 378 } 379 380 @Override 381 public synchronized boolean isFull() { 382 return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull()); 383 } 384 385 @Override 386 public boolean hasMessagesBufferedToDeliver() { 387 return !isEmpty(); 388 } 389 390 @Override 391 public void setSystemUsage(SystemUsage usageManager) { 392 super.setSystemUsage(usageManager); 393 } 394 395 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 396 if (newPercentUsage >= getMemoryUsageHighWaterMark()) { 397 List<MessageReference> expiredMessages = null; 398 synchronized (this) { 399 if (!flushRequired && size() != 0) { 400 flushRequired =true; 401 if (!iterating) { 402 expiredMessages = expireOldMessages(); 403 if (!hasSpace()) { 404 flushToDisk(); 405 flushRequired = false; 406 } 407 } 408 } 409 } 410 411 if (expiredMessages != null) { 412 for (MessageReference node : expiredMessages) { 413 discardExpiredMessage(node); 414 } 415 } 416 } 417 } 418 419 @Override 420 public boolean isTransient() { 421 return true; 422 } 423 424 private synchronized List<MessageReference> expireOldMessages() { 425 List<MessageReference> expired = new ArrayList<MessageReference>(); 426 if (!memoryList.isEmpty()) { 427 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 428 MessageReference node = iterator.next(); 429 if (node.isExpired()) { 430 node.decrementReferenceCount(); 431 expired.add(node); 432 iterator.remove(); 433 } 434 } 435 } 436 437 return expired; 438 } 439 440 protected synchronized void flushToDisk() { 441 if (!memoryList.isEmpty() && store != null) { 442 long start = 0; 443 if (LOG.isTraceEnabled()) { 444 start = System.currentTimeMillis(); 445 LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[] { name, memoryList.size(), 446 (systemUsage != null ? systemUsage.getMemoryUsage() : "") }); 447 } 448 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 449 MessageReference node = iterator.next(); 450 node.decrementReferenceCount(); 451 ByteSequence bs; 452 try { 453 bs = getByteSequence(node.getMessage()); 454 getDiskList().addLast(node.getMessageId().toString(), bs); 455 } catch (IOException e) { 456 LOG.error("Failed to write to disk list", e); 457 throw new RuntimeException(e); 458 } 459 460 } 461 memoryList.clear(); 462 setCacheEnabled(false); 463 LOG.trace("{}, flushToDisk() done - {} ms {}", new Object[]{ name, (System.currentTimeMillis() - start), (systemUsage != null ? systemUsage.getMemoryUsage() : "") }); 464 } 465 } 466 467 protected boolean isDiskListEmpty() { 468 return diskList == null || diskList.isEmpty(); 469 } 470 471 public PList getDiskList() { 472 if (diskList == null) { 473 try { 474 diskList = store.getPList(name); 475 } catch (Exception e) { 476 LOG.error("Caught an IO Exception getting the DiskList {}", name, e); 477 throw new RuntimeException(e); 478 } 479 } 480 return diskList; 481 } 482 483 private void discardExpiredMessage(MessageReference reference) { 484 LOG.debug("Discarding expired message {}", reference); 485 if (reference.isExpired() && broker.isExpired(reference)) { 486 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); 487 context.setBroker(broker); 488 ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage())); 489 } 490 } 491 492 protected ByteSequence getByteSequence(Message message) throws IOException { 493 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 494 return new ByteSequence(packet.data, packet.offset, packet.length); 495 } 496 497 protected Message getMessage(ByteSequence bs) throws IOException { 498 org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs 499 .getOffset(), bs.getLength()); 500 return (Message) this.wireFormat.unmarshal(packet); 501 502 } 503 504 final class DiskIterator implements Iterator<MessageReference> { 505 private final PList.PListIterator iterator; 506 DiskIterator() { 507 try { 508 iterator = getDiskList().iterator(); 509 } catch (Exception e) { 510 throw new RuntimeException(e); 511 } 512 } 513 514 public boolean hasNext() { 515 return iterator.hasNext(); 516 } 517 518 public MessageReference next() { 519 try { 520 PListEntry entry = iterator.next(); 521 Message message = getMessage(entry.getByteSequence()); 522 message.getMessageId().setPlistLocator(entry.getLocator()); 523 return message; 524 } catch (IOException e) { 525 LOG.error("I/O error", e); 526 throw new RuntimeException(e); 527 } 528 } 529 530 public void remove() { 531 iterator.remove(); 532 } 533 534 public void release() { 535 iterator.release(); 536 } 537 } 538}