001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import org.apache.activemq.broker.region.MessageReference;
020import org.apache.activemq.command.MessageId;
021
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Iterator;
025import java.util.List;
026
027/**
028 * An abstraction that keeps the correct order of messages that need to be dispatched
029 * to consumers, but also hides the fact that there might be redelivered messages that
030 * should be dispatched ahead of any other paged in messages.
031 *
032 * Direct usage of this class is recommended as you can control when redeliveries need
033 * to be added vs regular pending messages (the next set of messages that can be dispatched)
034 *
035 * Created by ceposta
036 * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
037 */
038public class QueueDispatchPendingList implements PendingList {
039
040    private PendingList pagedInPendingDispatch = new OrderedPendingList();
041    private PendingList redeliveredWaitingDispatch = new OrderedPendingList();
042    private boolean prioritized = false;
043
044
045    @Override
046    public boolean isEmpty() {
047        return pagedInPendingDispatch.isEmpty() && redeliveredWaitingDispatch.isEmpty();
048    }
049
050    @Override
051    public void clear() {
052        pagedInPendingDispatch.clear();
053        redeliveredWaitingDispatch.clear();
054    }
055
056    /**
057     * Messages added are added directly to the pagedInPendingDispatch set of messages. If
058     * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery()
059     * method
060     * @param message
061     *      The MessageReference that is to be added to this list.
062     *
063     * @return
064     */
065    @Override
066    public PendingNode addMessageFirst(MessageReference message) {
067        return pagedInPendingDispatch.addMessageFirst(message);
068    }
069
070    /**
071     * Messages added are added directly to the pagedInPendingDispatch set of messages. If
072     * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery()
073     * method
074     * @param message
075     *      The MessageReference that is to be added to this list.
076     *
077     * @return
078     */
079    @Override
080    public PendingNode addMessageLast(MessageReference message) {
081        return pagedInPendingDispatch.addMessageLast(message);
082    }
083
084    @Override
085    public PendingNode remove(MessageReference message) {
086        if (pagedInPendingDispatch.contains(message)) {
087            return pagedInPendingDispatch.remove(message);
088        } else if (redeliveredWaitingDispatch.contains(message)) {
089            return redeliveredWaitingDispatch.remove(message);
090        }
091        return null;
092    }
093
094    @Override
095    public int size() {
096        return pagedInPendingDispatch.size() + redeliveredWaitingDispatch.size();
097    }
098
099    @Override
100    public Iterator<MessageReference> iterator() {
101        if (prioritized && hasRedeliveries()) {
102            final QueueDispatchPendingList delegate = this;
103            final PrioritizedPendingList  priorityOrderedRedeliveredAndPending = new PrioritizedPendingList();
104            priorityOrderedRedeliveredAndPending.addAll(redeliveredWaitingDispatch);
105            priorityOrderedRedeliveredAndPending.addAll(pagedInPendingDispatch);
106
107            return new Iterator<MessageReference>() {
108
109                Iterator<MessageReference> combinedIterator = priorityOrderedRedeliveredAndPending.iterator();
110                MessageReference current = null;
111
112                @Override
113                public boolean hasNext() {
114                    return combinedIterator.hasNext();
115                }
116
117                @Override
118                public MessageReference next() {
119                    current = combinedIterator.next();
120                    return current;
121                }
122
123                @Override
124                public void remove() {
125                    if (current!=null) {
126                        delegate.remove(current);
127                    }
128                }
129            };
130
131        } else {
132
133            return new Iterator<MessageReference>() {
134
135                Iterator<MessageReference> redeliveries = redeliveredWaitingDispatch.iterator();
136                Iterator<MessageReference> pendingDispatch = pagedInPendingDispatch.iterator();
137                Iterator<MessageReference> current = redeliveries;
138
139
140                @Override
141                public boolean hasNext() {
142                    if (!redeliveries.hasNext() && (current == redeliveries)) {
143                        current = pendingDispatch;
144                    }
145                    return current.hasNext();
146                }
147
148                @Override
149                public MessageReference next() {
150                    return current.next();
151                }
152
153                @Override
154                public void remove() {
155                    current.remove();
156                }
157            };
158        }
159    }
160
161    @Override
162    public boolean contains(MessageReference message) {
163        return pagedInPendingDispatch.contains(message) || redeliveredWaitingDispatch.contains(message);
164    }
165
166    @Override
167    public Collection<MessageReference> values() {
168        List<MessageReference> messageReferences = new ArrayList<MessageReference>();
169        Iterator<MessageReference> iterator = iterator();
170        while (iterator.hasNext()) {
171            messageReferences.add(iterator.next());
172        }
173        return messageReferences;
174    }
175
176    @Override
177    public void addAll(PendingList pendingList) {
178        pagedInPendingDispatch.addAll(pendingList);
179    }
180
181    @Override
182    public MessageReference get(MessageId messageId) {
183        MessageReference rc = pagedInPendingDispatch.get(messageId);
184        if (rc == null) {
185            return redeliveredWaitingDispatch.get(messageId);
186        }
187        return rc;
188    }
189
190    public void setPrioritizedMessages(boolean prioritizedMessages) {
191        prioritized = prioritizedMessages;
192        if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
193            pagedInPendingDispatch = new PrioritizedPendingList();
194            redeliveredWaitingDispatch = new PrioritizedPendingList();
195        } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
196            pagedInPendingDispatch = new OrderedPendingList();
197            redeliveredWaitingDispatch = new OrderedPendingList();
198        }
199    }
200
201    public boolean hasRedeliveries(){
202        return !redeliveredWaitingDispatch.isEmpty();
203    }
204
205    public void addForRedelivery(List<MessageReference> list, boolean noConsumers) {
206        if (noConsumers && redeliveredWaitingDispatch instanceof OrderedPendingList && willBeInOrder(list)) {
207            // a single consumer can expect repeatable redelivery order irrespective
208            // of transaction or prefetch boundaries
209            ((OrderedPendingList)redeliveredWaitingDispatch).insertAtHead(list);
210        } else {
211            for (MessageReference ref : list) {
212                redeliveredWaitingDispatch.addMessageLast(ref);
213            }
214        }
215    }
216
217    private boolean willBeInOrder(List<MessageReference> list) {
218        // for a single consumer inserting at head will be in order w.r.t brokerSequence but
219        // will not be if there were multiple consumers in the mix even if this is the last
220        // consumer to close (noConsumers==true)
221        return !redeliveredWaitingDispatch.isEmpty() && list != null && !list.isEmpty() &&
222            redeliveredWaitingDispatch.iterator().next().getMessageId().getBrokerSequenceId() > list.get(list.size() - 1).getMessageId().getBrokerSequenceId();
223    }
224}