001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.util.ArrayList; 020import java.util.LinkedList; 021import java.util.List; 022import org.apache.activemq.command.MessageDispatch; 023 024public class FifoMessageDispatchChannel implements MessageDispatchChannel { 025 026 private final Object mutex = new Object(); 027 private final LinkedList<MessageDispatch> list; 028 private boolean closed; 029 private boolean running; 030 031 public FifoMessageDispatchChannel() { 032 this.list = new LinkedList<MessageDispatch>(); 033 } 034 035 /* (non-Javadoc) 036 * @see org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq.command.MessageDispatch) 037 */ 038 public void enqueue(MessageDispatch message) { 039 synchronized (mutex) { 040 list.addLast(message); 041 mutex.notify(); 042 } 043 } 044 045 /* (non-Javadoc) 046 * @see org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq.command.MessageDispatch) 047 */ 048 public void enqueueFirst(MessageDispatch message) { 049 synchronized (mutex) { 050 list.addFirst(message); 051 mutex.notify(); 052 } 053 } 054 055 /* (non-Javadoc) 056 * @see org.apache.activemq.MessageDispatchChannelI#isEmpty() 057 */ 058 public boolean isEmpty() { 059 synchronized (mutex) { 060 return list.isEmpty(); 061 } 062 } 063 064 /* (non-Javadoc) 065 * @see org.apache.activemq.MessageDispatchChannelI#dequeue(long) 066 */ 067 public MessageDispatch dequeue(long timeout) throws InterruptedException { 068 synchronized (mutex) { 069 // Wait until the consumer is ready to deliver messages. 070 while (timeout != 0 && !closed && (list.isEmpty() || !running)) { 071 if (timeout == -1) { 072 mutex.wait(); 073 } else { 074 mutex.wait(timeout); 075 break; 076 } 077 } 078 if (closed || !running || list.isEmpty()) { 079 return null; 080 } 081 return list.removeFirst(); 082 } 083 } 084 085 /* (non-Javadoc) 086 * @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait() 087 */ 088 public MessageDispatch dequeueNoWait() { 089 synchronized (mutex) { 090 if (closed || !running || list.isEmpty()) { 091 return null; 092 } 093 return list.removeFirst(); 094 } 095 } 096 097 /* (non-Javadoc) 098 * @see org.apache.activemq.MessageDispatchChannelI#peek() 099 */ 100 public MessageDispatch peek() { 101 synchronized (mutex) { 102 if (closed || !running || list.isEmpty()) { 103 return null; 104 } 105 return list.getFirst(); 106 } 107 } 108 109 /* (non-Javadoc) 110 * @see org.apache.activemq.MessageDispatchChannelI#start() 111 */ 112 public void start() { 113 synchronized (mutex) { 114 running = true; 115 mutex.notifyAll(); 116 } 117 } 118 119 /* (non-Javadoc) 120 * @see org.apache.activemq.MessageDispatchChannelI#stop() 121 */ 122 public void stop() { 123 synchronized (mutex) { 124 running = false; 125 mutex.notifyAll(); 126 } 127 } 128 129 /* (non-Javadoc) 130 * @see org.apache.activemq.MessageDispatchChannelI#close() 131 */ 132 public void close() { 133 synchronized (mutex) { 134 if (!closed) { 135 running = false; 136 closed = true; 137 } 138 mutex.notifyAll(); 139 } 140 } 141 142 /* (non-Javadoc) 143 * @see org.apache.activemq.MessageDispatchChannelI#clear() 144 */ 145 public void clear() { 146 synchronized (mutex) { 147 list.clear(); 148 } 149 } 150 151 /* (non-Javadoc) 152 * @see org.apache.activemq.MessageDispatchChannelI#isClosed() 153 */ 154 public boolean isClosed() { 155 return closed; 156 } 157 158 /* (non-Javadoc) 159 * @see org.apache.activemq.MessageDispatchChannelI#size() 160 */ 161 public int size() { 162 synchronized (mutex) { 163 return list.size(); 164 } 165 } 166 167 /* (non-Javadoc) 168 * @see org.apache.activemq.MessageDispatchChannelI#getMutex() 169 */ 170 public Object getMutex() { 171 return mutex; 172 } 173 174 /* (non-Javadoc) 175 * @see org.apache.activemq.MessageDispatchChannelI#isRunning() 176 */ 177 public boolean isRunning() { 178 return running; 179 } 180 181 /* (non-Javadoc) 182 * @see org.apache.activemq.MessageDispatchChannelI#removeAll() 183 */ 184 public List<MessageDispatch> removeAll() { 185 synchronized (mutex) { 186 ArrayList<MessageDispatch> rc = new ArrayList<MessageDispatch>(list); 187 list.clear(); 188 return rc; 189 } 190 } 191 192 @Override 193 public String toString() { 194 synchronized (mutex) { 195 return list.toString(); 196 } 197 } 198}