001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.memory; 018 019import java.io.IOException; 020import java.util.Collections; 021import java.util.Iterator; 022import java.util.LinkedHashMap; 023import java.util.Map; 024import java.util.Map.Entry; 025 026import org.apache.activemq.broker.ConnectionContext; 027import org.apache.activemq.command.ActiveMQDestination; 028import org.apache.activemq.command.Message; 029import org.apache.activemq.command.MessageAck; 030import org.apache.activemq.command.MessageId; 031import org.apache.activemq.store.IndexListener; 032import org.apache.activemq.store.MessageRecoveryListener; 033import org.apache.activemq.store.AbstractMessageStore; 034 035/** 036 * An implementation of {@link org.apache.activemq.store.MessageStore} which 037 * uses a 038 * 039 * 040 */ 041public class MemoryMessageStore extends AbstractMessageStore { 042 043 protected final Map<MessageId, Message> messageTable; 044 protected MessageId lastBatchId; 045 protected long sequenceId; 046 047 public MemoryMessageStore(ActiveMQDestination destination) { 048 this(destination, new LinkedHashMap<MessageId, Message>()); 049 } 050 051 public MemoryMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable) { 052 super(destination); 053 this.messageTable = Collections.synchronizedMap(messageTable); 054 } 055 056 public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { 057 synchronized (messageTable) { 058 messageTable.put(message.getMessageId(), message); 059 message.incrementReferenceCount(); 060 message.getMessageId().setFutureOrSequenceLong(sequenceId++); 061 if (indexListener != null) { 062 indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); 063 } 064 } 065 } 066 067 // public void addMessageReference(ConnectionContext context,MessageId 068 // messageId,long expirationTime,String messageRef) 069 // throws IOException{ 070 // synchronized(messageTable){ 071 // messageTable.put(messageId,messageRef); 072 // } 073 // } 074 075 public Message getMessage(MessageId identity) throws IOException { 076 return messageTable.get(identity); 077 } 078 079 // public String getMessageReference(MessageId identity) throws IOException{ 080 // return (String)messageTable.get(identity); 081 // } 082 083 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 084 removeMessage(ack.getLastMessageId()); 085 } 086 087 public void removeMessage(MessageId msgId) throws IOException { 088 synchronized (messageTable) { 089 Message removed = messageTable.remove(msgId); 090 if( removed !=null ) { 091 removed.decrementReferenceCount(); 092 } 093 if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) { 094 lastBatchId = null; 095 } 096 } 097 } 098 099 public void recover(MessageRecoveryListener listener) throws Exception { 100 // the message table is a synchronizedMap - so just have to synchronize 101 // here 102 synchronized (messageTable) { 103 for (Iterator<Message> iter = messageTable.values().iterator(); iter.hasNext();) { 104 Object msg = iter.next(); 105 if (msg.getClass() == MessageId.class) { 106 listener.recoverMessageReference((MessageId)msg); 107 } else { 108 listener.recoverMessage((Message)msg); 109 } 110 } 111 } 112 } 113 114 public void removeAllMessages(ConnectionContext context) throws IOException { 115 synchronized (messageTable) { 116 messageTable.clear(); 117 } 118 } 119 120 public void delete() { 121 synchronized (messageTable) { 122 messageTable.clear(); 123 } 124 } 125 126 127 public int getMessageCount() { 128 return messageTable.size(); 129 } 130 131 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { 132 synchronized (messageTable) { 133 boolean pastLackBatch = lastBatchId == null; 134 int count = 0; 135 for (Iterator iter = messageTable.entrySet().iterator(); iter.hasNext();) { 136 Map.Entry entry = (Entry)iter.next(); 137 if (pastLackBatch) { 138 count++; 139 Object msg = entry.getValue(); 140 lastBatchId = (MessageId)entry.getKey(); 141 if (msg.getClass() == MessageId.class) { 142 listener.recoverMessageReference((MessageId)msg); 143 } else { 144 listener.recoverMessage((Message)msg); 145 } 146 } else { 147 pastLackBatch = entry.getKey().equals(lastBatchId); 148 } 149 } 150 } 151 } 152 153 public void resetBatching() { 154 lastBatchId = null; 155 } 156 157 @Override 158 public void setBatch(MessageId messageId) { 159 lastBatchId = messageId; 160 } 161 162 public void updateMessage(Message message) { 163 synchronized (messageTable) { 164 messageTable.put(message.getMessageId(), message); 165 } 166 } 167 168}