001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.util.ArrayList; 020import java.util.LinkedList; 021import java.util.List; 022 023import org.apache.activemq.command.MessageDispatch; 024 025public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel { 026 private static final Integer MAX_PRIORITY = 10; 027 private final Object mutex = new Object(); 028 private final LinkedList<MessageDispatch>[] lists; 029 private boolean closed; 030 private boolean running; 031 private int size = 0; 032 033 @SuppressWarnings("unchecked") 034 public SimplePriorityMessageDispatchChannel() { 035 this.lists = new LinkedList[MAX_PRIORITY]; 036 for (int i = 0; i < MAX_PRIORITY; i++) { 037 lists[i] = new LinkedList<MessageDispatch>(); 038 } 039 } 040 041 /* 042 * (non-Javadoc) 043 * 044 * @see org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq.command.MessageDispatch) 045 */ 046 @Override 047 public void enqueue(MessageDispatch message) { 048 synchronized (mutex) { 049 getList(message).addLast(message); 050 this.size++; 051 mutex.notify(); 052 } 053 } 054 055 /* 056 * (non-Javadoc) 057 * 058 * @see org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq.command.MessageDispatch) 059 */ 060 @Override 061 public void enqueueFirst(MessageDispatch message) { 062 synchronized (mutex) { 063 getList(message).addFirst(message); 064 this.size++; 065 mutex.notify(); 066 } 067 } 068 069 /* 070 * (non-Javadoc) 071 * 072 * @see org.apache.activemq.MessageDispatchChannelI#isEmpty() 073 */ 074 @Override 075 public boolean isEmpty() { 076 return this.size == 0; 077 } 078 079 /* 080 * (non-Javadoc) 081 * 082 * @see org.apache.activemq.MessageDispatchChannelI#dequeue(long) 083 */ 084 @Override 085 public MessageDispatch dequeue(long timeout) throws InterruptedException { 086 synchronized (mutex) { 087 // Wait until the consumer is ready to deliver messages. 088 while (timeout != 0 && !closed && (isEmpty() || !running)) { 089 if (timeout == -1) { 090 mutex.wait(); 091 } else { 092 mutex.wait(timeout); 093 break; 094 } 095 } 096 if (closed || !running || isEmpty()) { 097 return null; 098 } 099 return removeFirst(); 100 } 101 } 102 103 /* 104 * (non-Javadoc) 105 * 106 * @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait() 107 */ 108 @Override 109 public MessageDispatch dequeueNoWait() { 110 synchronized (mutex) { 111 if (closed || !running || isEmpty()) { 112 return null; 113 } 114 return removeFirst(); 115 } 116 } 117 118 /* 119 * (non-Javadoc) 120 * 121 * @see org.apache.activemq.MessageDispatchChannelI#peek() 122 */ 123 @Override 124 public MessageDispatch peek() { 125 synchronized (mutex) { 126 if (closed || !running || isEmpty()) { 127 return null; 128 } 129 return getFirst(); 130 } 131 } 132 133 /* 134 * (non-Javadoc) 135 * 136 * @see org.apache.activemq.MessageDispatchChannelI#start() 137 */ 138 @Override 139 public void start() { 140 synchronized (mutex) { 141 running = true; 142 mutex.notifyAll(); 143 } 144 } 145 146 /* 147 * (non-Javadoc) 148 * 149 * @see org.apache.activemq.MessageDispatchChannelI#stop() 150 */ 151 @Override 152 public void stop() { 153 synchronized (mutex) { 154 running = false; 155 mutex.notifyAll(); 156 } 157 } 158 159 /* 160 * (non-Javadoc) 161 * 162 * @see org.apache.activemq.MessageDispatchChannelI#close() 163 */ 164 @Override 165 public void close() { 166 synchronized (mutex) { 167 if (!closed) { 168 running = false; 169 closed = true; 170 } 171 mutex.notifyAll(); 172 } 173 } 174 175 /* 176 * (non-Javadoc) 177 * 178 * @see org.apache.activemq.MessageDispatchChannelI#clear() 179 */ 180 @Override 181 public void clear() { 182 synchronized (mutex) { 183 for (int i = 0; i < MAX_PRIORITY; i++) { 184 lists[i].clear(); 185 } 186 this.size = 0; 187 } 188 } 189 190 /* 191 * (non-Javadoc) 192 * 193 * @see org.apache.activemq.MessageDispatchChannelI#isClosed() 194 */ 195 @Override 196 public boolean isClosed() { 197 return closed; 198 } 199 200 /* 201 * (non-Javadoc) 202 * 203 * @see org.apache.activemq.MessageDispatchChannelI#size() 204 */ 205 @Override 206 public int size() { 207 synchronized (mutex) { 208 return this.size; 209 } 210 } 211 212 /* 213 * (non-Javadoc) 214 * 215 * @see org.apache.activemq.MessageDispatchChannelI#getMutex() 216 */ 217 @Override 218 public Object getMutex() { 219 return mutex; 220 } 221 222 /* 223 * (non-Javadoc) 224 * 225 * @see org.apache.activemq.MessageDispatchChannelI#isRunning() 226 */ 227 @Override 228 public boolean isRunning() { 229 return running; 230 } 231 232 /* 233 * (non-Javadoc) 234 * 235 * @see org.apache.activemq.MessageDispatchChannelI#removeAll() 236 */ 237 @Override 238 public List<MessageDispatch> removeAll() { 239 synchronized (mutex) { 240 ArrayList<MessageDispatch> result = new ArrayList<MessageDispatch>(size()); 241 for (int i = MAX_PRIORITY - 1; i >= 0; i--) { 242 List<MessageDispatch> list = lists[i]; 243 result.addAll(list); 244 size -= list.size(); 245 list.clear(); 246 } 247 return result; 248 } 249 } 250 251 @Override 252 public String toString() { 253 String result = ""; 254 for (int i = MAX_PRIORITY - 1; i >= 0; i--) { 255 result += i + ":{" + lists[i].toString() + "}"; 256 } 257 return result; 258 } 259 260 protected int getPriority(MessageDispatch message) { 261 int priority = javax.jms.Message.DEFAULT_PRIORITY; 262 if (message.getMessage() != null) { 263 priority = Math.max(message.getMessage().getPriority(), 0); 264 priority = Math.min(priority, 9); 265 } 266 return priority; 267 } 268 269 protected LinkedList<MessageDispatch> getList(MessageDispatch md) { 270 return lists[getPriority(md)]; 271 } 272 273 private final MessageDispatch removeFirst() { 274 if (this.size > 0) { 275 for (int i = MAX_PRIORITY - 1; i >= 0; i--) { 276 LinkedList<MessageDispatch> list = lists[i]; 277 if (!list.isEmpty()) { 278 this.size--; 279 return list.removeFirst(); 280 } 281 } 282 } 283 return null; 284 } 285 286 private final MessageDispatch getFirst() { 287 if (this.size > 0) { 288 for (int i = MAX_PRIORITY - 1; i >= 0; i--) { 289 LinkedList<MessageDispatch> list = lists[i]; 290 if (!list.isEmpty()) { 291 return list.getFirst(); 292 } 293 } 294 } 295 return null; 296 } 297}