001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import org.apache.activemq.broker.region.MessageReference; 020import org.apache.activemq.command.MessageId; 021 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Iterator; 025import java.util.List; 026 027/** 028 * An abstraction that keeps the correct order of messages that need to be dispatched 029 * to consumers, but also hides the fact that there might be redelivered messages that 030 * should be dispatched ahead of any other paged in messages. 031 * 032 * Direct usage of this class is recommended as you can control when redeliveries need 033 * to be added vs regular pending messages (the next set of messages that can be dispatched) 034 * 035 * Created by ceposta 036 * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>. 037 */ 038public class QueueDispatchPendingList implements PendingList { 039 040 private PendingList pagedInPendingDispatch = new OrderedPendingList(); 041 private PendingList redeliveredWaitingDispatch = new OrderedPendingList(); 042 private boolean prioritized = false; 043 044 045 @Override 046 public boolean isEmpty() { 047 return pagedInPendingDispatch.isEmpty() && redeliveredWaitingDispatch.isEmpty(); 048 } 049 050 @Override 051 public void clear() { 052 pagedInPendingDispatch.clear(); 053 redeliveredWaitingDispatch.clear(); 054 } 055 056 /** 057 * Messages added are added directly to the pagedInPendingDispatch set of messages. If 058 * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery() 059 * method 060 * @param message 061 * The MessageReference that is to be added to this list. 062 * 063 * @return 064 */ 065 @Override 066 public PendingNode addMessageFirst(MessageReference message) { 067 return pagedInPendingDispatch.addMessageFirst(message); 068 } 069 070 /** 071 * Messages added are added directly to the pagedInPendingDispatch set of messages. If 072 * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery() 073 * method 074 * @param message 075 * The MessageReference that is to be added to this list. 076 * 077 * @return 078 */ 079 @Override 080 public PendingNode addMessageLast(MessageReference message) { 081 return pagedInPendingDispatch.addMessageLast(message); 082 } 083 084 @Override 085 public PendingNode remove(MessageReference message) { 086 if (pagedInPendingDispatch.contains(message)) { 087 return pagedInPendingDispatch.remove(message); 088 } else if (redeliveredWaitingDispatch.contains(message)) { 089 return redeliveredWaitingDispatch.remove(message); 090 } 091 return null; 092 } 093 094 @Override 095 public int size() { 096 return pagedInPendingDispatch.size() + redeliveredWaitingDispatch.size(); 097 } 098 099 @Override 100 public Iterator<MessageReference> iterator() { 101 if (prioritized && hasRedeliveries()) { 102 final QueueDispatchPendingList delegate = this; 103 final PrioritizedPendingList priorityOrderedRedeliveredAndPending = new PrioritizedPendingList(); 104 priorityOrderedRedeliveredAndPending.addAll(redeliveredWaitingDispatch); 105 priorityOrderedRedeliveredAndPending.addAll(pagedInPendingDispatch); 106 107 return new Iterator<MessageReference>() { 108 109 Iterator<MessageReference> combinedIterator = priorityOrderedRedeliveredAndPending.iterator(); 110 MessageReference current = null; 111 112 @Override 113 public boolean hasNext() { 114 return combinedIterator.hasNext(); 115 } 116 117 @Override 118 public MessageReference next() { 119 current = combinedIterator.next(); 120 return current; 121 } 122 123 @Override 124 public void remove() { 125 if (current!=null) { 126 delegate.remove(current); 127 } 128 } 129 }; 130 131 } else { 132 133 return new Iterator<MessageReference>() { 134 135 Iterator<MessageReference> redeliveries = redeliveredWaitingDispatch.iterator(); 136 Iterator<MessageReference> pendingDispatch = pagedInPendingDispatch.iterator(); 137 Iterator<MessageReference> current = redeliveries; 138 139 140 @Override 141 public boolean hasNext() { 142 if (!redeliveries.hasNext() && (current == redeliveries)) { 143 current = pendingDispatch; 144 } 145 return current.hasNext(); 146 } 147 148 @Override 149 public MessageReference next() { 150 return current.next(); 151 } 152 153 @Override 154 public void remove() { 155 current.remove(); 156 } 157 }; 158 } 159 } 160 161 @Override 162 public boolean contains(MessageReference message) { 163 return pagedInPendingDispatch.contains(message) || redeliveredWaitingDispatch.contains(message); 164 } 165 166 @Override 167 public Collection<MessageReference> values() { 168 List<MessageReference> messageReferences = new ArrayList<MessageReference>(); 169 Iterator<MessageReference> iterator = iterator(); 170 while (iterator.hasNext()) { 171 messageReferences.add(iterator.next()); 172 } 173 return messageReferences; 174 } 175 176 @Override 177 public void addAll(PendingList pendingList) { 178 pagedInPendingDispatch.addAll(pendingList); 179 } 180 181 @Override 182 public MessageReference get(MessageId messageId) { 183 MessageReference rc = pagedInPendingDispatch.get(messageId); 184 if (rc == null) { 185 return redeliveredWaitingDispatch.get(messageId); 186 } 187 return rc; 188 } 189 190 public void setPrioritizedMessages(boolean prioritizedMessages) { 191 prioritized = prioritizedMessages; 192 if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) { 193 pagedInPendingDispatch = new PrioritizedPendingList(); 194 redeliveredWaitingDispatch = new PrioritizedPendingList(); 195 } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) { 196 pagedInPendingDispatch = new OrderedPendingList(); 197 redeliveredWaitingDispatch = new OrderedPendingList(); 198 } 199 } 200 201 public boolean hasRedeliveries(){ 202 return !redeliveredWaitingDispatch.isEmpty(); 203 } 204 205 public void addForRedelivery(List<MessageReference> list, boolean noConsumers) { 206 if (noConsumers && redeliveredWaitingDispatch instanceof OrderedPendingList && willBeInOrder(list)) { 207 // a single consumer can expect repeatable redelivery order irrespective 208 // of transaction or prefetch boundaries 209 ((OrderedPendingList)redeliveredWaitingDispatch).insertAtHead(list); 210 } else { 211 for (MessageReference ref : list) { 212 redeliveredWaitingDispatch.addMessageLast(ref); 213 } 214 } 215 } 216 217 private boolean willBeInOrder(List<MessageReference> list) { 218 // for a single consumer inserting at head will be in order w.r.t brokerSequence but 219 // will not be if there were multiple consumers in the mix even if this is the last 220 // consumer to close (noConsumers==true) 221 return !redeliveredWaitingDispatch.isEmpty() && list != null && !list.isEmpty() && 222 redeliveredWaitingDispatch.iterator().next().getMessageId().getBrokerSequenceId() > list.get(list.size() - 1).getMessageId().getBrokerSequenceId(); 223 } 224}