001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.util.ArrayList; 020import java.util.Iterator; 021import java.util.LinkedList; 022import java.util.List; 023import org.apache.activemq.broker.ConnectionContext; 024import org.apache.activemq.broker.region.Destination; 025import org.apache.activemq.broker.region.MessageReference; 026import org.apache.activemq.broker.region.QueueMessageReference; 027 028/** 029 * hold pending messages in a linked list (messages awaiting disptach to a 030 * consumer) cursor 031 * 032 * 033 */ 034public class VMPendingMessageCursor extends AbstractPendingMessageCursor { 035 private final PendingList list; 036 private Iterator<MessageReference> iter; 037 038 public VMPendingMessageCursor(boolean prioritizedMessages) { 039 super(prioritizedMessages); 040 if (this.prioritizedMessages) { 041 this.list= new PrioritizedPendingList(); 042 }else { 043 this.list = new OrderedPendingList(); 044 } 045 } 046 047 048 public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) 049 throws Exception { 050 List<MessageReference> rc = new ArrayList<MessageReference>(); 051 for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) { 052 MessageReference r = iterator.next(); 053 if (r.getRegionDestination() == destination) { 054 r.decrementReferenceCount(); 055 rc.add(r); 056 iterator.remove(); 057 } 058 } 059 return rc; 060 } 061 062 /** 063 * @return true if there are no pending messages 064 */ 065 066 public synchronized boolean isEmpty() { 067 if (list.isEmpty()) { 068 return true; 069 } else { 070 for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) { 071 MessageReference node = iterator.next(); 072 if (node == QueueMessageReference.NULL_MESSAGE) { 073 continue; 074 } 075 if (!node.isDropped()) { 076 return false; 077 } 078 // We can remove dropped references. 079 iterator.remove(); 080 } 081 return true; 082 } 083 } 084 085 /** 086 * reset the cursor 087 */ 088 089 public synchronized void reset() { 090 iter = list.iterator(); 091 last = null; 092 } 093 094 /** 095 * add message to await dispatch 096 * 097 * @param node 098 */ 099 100 public synchronized boolean addMessageLast(MessageReference node) { 101 node.incrementReferenceCount(); 102 list.addMessageLast(node); 103 return true; 104 } 105 106 /** 107 * add message to await dispatch 108 * 109 * @param node 110 */ 111 112 public synchronized void addMessageFirst(MessageReference node) { 113 node.incrementReferenceCount(); 114 list.addMessageFirst(node); 115 } 116 117 /** 118 * @return true if there pending messages to dispatch 119 */ 120 121 public synchronized boolean hasNext() { 122 return iter.hasNext(); 123 } 124 125 /** 126 * @return the next pending message 127 */ 128 129 public synchronized MessageReference next() { 130 last = iter.next(); 131 if (last != null) { 132 last.incrementReferenceCount(); 133 } 134 return last; 135 } 136 137 /** 138 * remove the message at the cursor position 139 */ 140 141 public synchronized void remove() { 142 if (last != null) { 143 last.decrementReferenceCount(); 144 } 145 iter.remove(); 146 } 147 148 /** 149 * @return the number of pending messages 150 */ 151 152 public synchronized int size() { 153 return list.size(); 154 } 155 156 /** 157 * clear all pending messages 158 */ 159 160 public synchronized void clear() { 161 for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) { 162 MessageReference ref = i.next(); 163 ref.decrementReferenceCount(); 164 } 165 list.clear(); 166 } 167 168 169 public synchronized void remove(MessageReference node) { 170 list.remove(node); 171 node.decrementReferenceCount(); 172 } 173 174 /** 175 * Page in a restricted number of messages 176 * 177 * @param maxItems 178 * @return a list of paged in messages 179 */ 180 181 public LinkedList<MessageReference> pageInList(int maxItems) { 182 LinkedList<MessageReference> result = new LinkedList<MessageReference>(); 183 for (Iterator<MessageReference>i = list.iterator();i.hasNext();) { 184 MessageReference ref = i.next(); 185 ref.incrementReferenceCount(); 186 result.add(ref); 187 if (result.size() >= maxItems) { 188 break; 189 } 190 } 191 return result; 192 } 193 194 195 public boolean isTransient() { 196 return true; 197 } 198 199 200 public void destroy() throws Exception { 201 super.destroy(); 202 clear(); 203 } 204}