001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import static org.apache.activemq.broker.region.cursors.OrderedPendingList.getValues;
020
021import java.util.ArrayDeque;
022import java.util.Collection;
023import java.util.Deque;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.Map;
027
028import org.apache.activemq.broker.region.MessageReference;
029import org.apache.activemq.command.MessageId;
030
031public class PrioritizedPendingList implements PendingList {
032
033    private static final Integer MAX_PRIORITY = 10;
034    private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
035    private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
036
037    public PrioritizedPendingList() {
038        for (int i = 0; i < MAX_PRIORITY; i++) {
039            this.lists[i] = new OrderedPendingList();
040        }
041    }
042
043    public PendingNode addMessageFirst(MessageReference message) {
044        PendingNode node = getList(message).addMessageFirst(message);
045        this.map.put(message.getMessageId(), node);
046        return node;
047    }
048
049    public PendingNode addMessageLast(MessageReference message) {
050        PendingNode node = getList(message).addMessageLast(message);
051        this.map.put(message.getMessageId(), node);
052        return node;
053    }
054
055    public void clear() {
056        for (int i = 0; i < MAX_PRIORITY; i++) {
057            this.lists[i].clear();
058        }
059        this.map.clear();
060    }
061
062    public boolean isEmpty() {
063        return this.map.isEmpty();
064    }
065
066    public Iterator<MessageReference> iterator() {
067        return new PrioritizedPendingListIterator();
068    }
069
070    public PendingNode remove(MessageReference message) {
071        PendingNode node = null;
072        if (message != null) {
073            node = this.map.remove(message.getMessageId());
074            if (node != null) {
075                node.getList().removeNode(node);
076            }
077        }
078        return node;
079    }
080
081    public int size() {
082        return this.map.size();
083    }
084
085    @Override
086    public String toString() {
087        return "PrioritizedPendingList(" + System.identityHashCode(this) + ")";
088    }
089
090    protected int getPriority(MessageReference message) {
091        int priority = javax.jms.Message.DEFAULT_PRIORITY;
092        if (message.getMessageId() != null) {
093            priority = Math.max(message.getMessage().getPriority(), 0);
094            priority = Math.min(priority, 9);
095        }
096        return priority;
097    }
098
099    protected OrderedPendingList getList(MessageReference msg) {
100        return lists[getPriority(msg)];
101    }
102
103    private final class PrioritizedPendingListIterator implements Iterator<MessageReference> {
104
105        private final Deque<Iterator<MessageReference>> iterators = new ArrayDeque<Iterator<MessageReference>>();
106
107        private Iterator<MessageReference> current;
108        private MessageReference currentMessage;
109
110        PrioritizedPendingListIterator() {
111            for (OrderedPendingList list : lists) {
112                if (!list.isEmpty()) {
113                    iterators.push(list.iterator());
114                }
115            }
116
117            current = iterators.poll();
118        }
119
120        @Override
121        public boolean hasNext() {
122            while (current != null) {
123                if (current.hasNext()) {
124                    return true;
125                } else {
126                    current = iterators.poll();
127                }
128            }
129
130            return false;
131        }
132
133        public MessageReference next() {
134            MessageReference result = null;
135
136            while (current != null) {
137                if (current.hasNext()) {
138                    result = currentMessage = current.next();
139                    break;
140                } else {
141                    current = iterators.poll();
142                }
143            }
144
145            return result;
146        }
147
148        public void remove() {
149            if (currentMessage != null) {
150                map.remove(currentMessage.getMessageId());
151                current.remove();
152                currentMessage = null;
153            }
154        }
155    }
156
157    @Override
158    public boolean contains(MessageReference message) {
159        if (message != null) {
160            return this.map.containsKey(message.getMessageId());
161        }
162        return false;
163    }
164
165    @Override
166    public Collection<MessageReference> values() {
167        return getValues(this);
168    }
169
170    @Override
171    public void addAll(PendingList pendingList) {
172        for(MessageReference messageReference : pendingList) {
173            addMessageLast(messageReference);
174        }
175    }
176
177    @Override
178    public MessageReference get(MessageId messageId) {
179        PendingNode node = map.get(messageId);
180        if (node != null) {
181            return node.getMessage();
182        }
183        return null;
184    }
185
186}