001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.journal;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.LinkedHashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028
029import org.apache.activeio.journal.RecordLocation;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.command.ActiveMQDestination;
032import org.apache.activemq.command.JournalQueueAck;
033import org.apache.activemq.command.Message;
034import org.apache.activemq.command.MessageAck;
035import org.apache.activemq.command.MessageId;
036import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
037import org.apache.activemq.store.IndexListener;
038import org.apache.activemq.store.MessageRecoveryListener;
039import org.apache.activemq.store.MessageStore;
040import org.apache.activemq.store.PersistenceAdapter;
041import org.apache.activemq.store.AbstractMessageStore;
042import org.apache.activemq.transaction.Synchronization;
043import org.apache.activemq.usage.MemoryUsage;
044import org.apache.activemq.util.Callback;
045import org.apache.activemq.util.TransactionTemplate;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * A MessageStore that uses a Journal to store it's messages.
051 * 
052 * 
053 */
054public class JournalMessageStore extends AbstractMessageStore {
055
056    private static final Logger LOG = LoggerFactory.getLogger(JournalMessageStore.class);
057
058    protected final JournalPersistenceAdapter peristenceAdapter;
059    protected final JournalTransactionStore transactionStore;
060    protected final MessageStore longTermStore;
061    protected final TransactionTemplate transactionTemplate;
062    protected RecordLocation lastLocation;
063    protected Set<RecordLocation> inFlightTxLocations = new HashSet<RecordLocation>();
064
065    private Map<MessageId, Message> messages = new LinkedHashMap<MessageId, Message>();
066    private List<MessageAck> messageAcks = new ArrayList<MessageAck>();
067
068    /** A MessageStore that we can use to retrieve messages quickly. */
069    private Map<MessageId, Message> cpAddedMessageIds;
070
071
072    private MemoryUsage memoryUsage;
073
074    public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
075        super(destination);
076        this.peristenceAdapter = adapter;
077        this.transactionStore = adapter.getTransactionStore();
078        this.longTermStore = checkpointStore;
079        this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
080    }
081
082    
083    public void setMemoryUsage(MemoryUsage memoryUsage) {
084        this.memoryUsage=memoryUsage;
085        longTermStore.setMemoryUsage(memoryUsage);
086    }
087
088    /**
089     * Not synchronized since the Journal has better throughput if you increase
090     * the number of concurrent writes that it is doing.
091     */
092    public void addMessage(final ConnectionContext context, final Message message) throws IOException {
093
094        final MessageId id = message.getMessageId();
095
096        final boolean debug = LOG.isDebugEnabled();
097        message.incrementReferenceCount();
098
099        final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
100        if (!context.isInTransaction()) {
101            if (debug) {
102                LOG.debug("Journalled message add for: " + id + ", at: " + location);
103            }
104            addMessage(context, message, location);
105        } else {
106            if (debug) {
107                LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
108            }
109            synchronized (this) {
110                inFlightTxLocations.add(location);
111            }
112            transactionStore.addMessage(this, message, location);
113            context.getTransaction().addSynchronization(new Synchronization() {
114                public void afterCommit() throws Exception {
115                    if (debug) {
116                        LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
117                    }
118                    synchronized (JournalMessageStore.this) {
119                        inFlightTxLocations.remove(location);
120                        addMessage(context, message, location);
121                    }
122                }
123
124                public void afterRollback() throws Exception {
125                    if (debug) {
126                        LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
127                    }
128                    synchronized (JournalMessageStore.this) {
129                        inFlightTxLocations.remove(location);
130                    }
131                    message.decrementReferenceCount();
132                }
133            });
134        }
135    }
136
137    void addMessage(ConnectionContext context, final Message message, final RecordLocation location) {
138        synchronized (this) {
139            lastLocation = location;
140            MessageId id = message.getMessageId();
141            messages.put(id, message);
142            message.getMessageId().setFutureOrSequenceLong(0l);
143            if (indexListener != null) {
144                indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
145            }
146        }
147    }
148
149    public void replayAddMessage(ConnectionContext context, Message message) {
150        try {
151            // Only add the message if it has not already been added.
152            Message t = longTermStore.getMessage(message.getMessageId());
153            if (t == null) {
154                longTermStore.addMessage(context, message);
155            }
156        } catch (Throwable e) {
157            LOG.warn("Could not replay add for message '" + message.getMessageId() + "'.  Message may have already been added. reason: " + e);
158        }
159    }
160
161    /**
162     */
163    public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
164        final boolean debug = LOG.isDebugEnabled();
165        JournalQueueAck remove = new JournalQueueAck();
166        remove.setDestination(destination);
167        remove.setMessageAck(ack);
168
169        final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
170        if (!context.isInTransaction()) {
171            if (debug) {
172                LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
173            }
174            removeMessage(ack, location);
175        } else {
176            if (debug) {
177                LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
178            }
179            synchronized (this) {
180                inFlightTxLocations.add(location);
181            }
182            transactionStore.removeMessage(this, ack, location);
183            context.getTransaction().addSynchronization(new Synchronization() {
184                public void afterCommit() throws Exception {
185                    if (debug) {
186                        LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
187                    }
188                    synchronized (JournalMessageStore.this) {
189                        inFlightTxLocations.remove(location);
190                        removeMessage(ack, location);
191                    }
192                }
193
194                public void afterRollback() throws Exception {
195                    if (debug) {
196                        LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
197                    }
198                    synchronized (JournalMessageStore.this) {
199                        inFlightTxLocations.remove(location);
200                    }
201                }
202            });
203
204        }
205    }
206
207    final void removeMessage(final MessageAck ack, final RecordLocation location) {
208        synchronized (this) {
209            lastLocation = location;
210            MessageId id = ack.getLastMessageId();
211            Message message = messages.remove(id);
212            if (message == null) {
213                messageAcks.add(ack);
214            } else {
215                message.decrementReferenceCount();
216            }
217        }
218    }
219
220    public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
221        try {
222            // Only remove the message if it has not already been removed.
223            Message t = longTermStore.getMessage(messageAck.getLastMessageId());
224            if (t != null) {
225                longTermStore.removeMessage(context, messageAck);
226            }
227        } catch (Throwable e) {
228            LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'.  Message may have already been acknowledged. reason: " + e);
229        }
230    }
231
232    /**
233     * @return
234     * @throws IOException
235     */
236    public RecordLocation checkpoint() throws IOException {
237        return checkpoint(null);
238    }
239
240    /**
241     * @return
242     * @throws IOException
243     */
244    @SuppressWarnings("unchecked")
245    public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException {
246
247        final List<MessageAck> cpRemovedMessageLocations;
248        final List<RecordLocation> cpActiveJournalLocations;
249        final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
250
251        // swap out the message hash maps..
252        synchronized (this) {
253            cpAddedMessageIds = this.messages;
254            cpRemovedMessageLocations = this.messageAcks;
255
256            cpActiveJournalLocations = new ArrayList<RecordLocation>(inFlightTxLocations);
257
258            this.messages = new LinkedHashMap<MessageId, Message>();
259            this.messageAcks = new ArrayList<MessageAck>();
260        }
261
262        transactionTemplate.run(new Callback() {
263            public void execute() throws Exception {
264
265                int size = 0;
266
267                PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
268                ConnectionContext context = transactionTemplate.getContext();
269
270                // Checkpoint the added messages.
271                synchronized (JournalMessageStore.this) {
272                    Iterator<Message> iterator = cpAddedMessageIds.values().iterator();
273                    while (iterator.hasNext()) {
274                        Message message = iterator.next();
275                        try {
276                            longTermStore.addMessage(context, message);
277                        } catch (Throwable e) {
278                            LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
279                        }
280                        size += message.getSize();
281                        message.decrementReferenceCount();
282                        // Commit the batch if it's getting too big
283                        if (size >= maxCheckpointMessageAddSize) {
284                            persitanceAdapter.commitTransaction(context);
285                            persitanceAdapter.beginTransaction(context);
286                            size = 0;
287                        }
288                    }
289                }
290
291                persitanceAdapter.commitTransaction(context);
292                persitanceAdapter.beginTransaction(context);
293
294                // Checkpoint the removed messages.
295                Iterator<MessageAck> iterator = cpRemovedMessageLocations.iterator();
296                while (iterator.hasNext()) {
297                    try {
298                        MessageAck ack = iterator.next();
299                        longTermStore.removeMessage(transactionTemplate.getContext(), ack);
300                    } catch (Throwable e) {
301                        LOG.debug("Message could not be removed from long term store: " + e.getMessage(), e);
302                    }
303                }
304
305                if (postCheckpointTest != null) {
306                    postCheckpointTest.execute();
307                }
308            }
309
310        });
311
312        synchronized (this) {
313            cpAddedMessageIds = null;
314        }
315
316        if (cpActiveJournalLocations.size() > 0) {
317            Collections.sort(cpActiveJournalLocations);
318            return cpActiveJournalLocations.get(0);
319        }
320        synchronized (this) {
321            return lastLocation;
322        }
323    }
324
325    /**
326     * 
327     */
328    public Message getMessage(MessageId identity) throws IOException {
329        Message answer = null;
330
331        synchronized (this) {
332            // Do we have a still have it in the journal?
333            answer = messages.get(identity);
334            if (answer == null && cpAddedMessageIds != null) {
335                answer = cpAddedMessageIds.get(identity);
336            }
337        }
338
339        if (answer != null) {
340            return answer;
341        }
342
343        // If all else fails try the long term message store.
344        return longTermStore.getMessage(identity);
345    }
346
347    /**
348     * Replays the checkpointStore first as those messages are the oldest ones,
349     * then messages are replayed from the transaction log and then the cache is
350     * updated.
351     * 
352     * @param listener
353     * @throws Exception
354     */
355    public void recover(final MessageRecoveryListener listener) throws Exception {
356        peristenceAdapter.checkpoint(true, true);
357        longTermStore.recover(listener);
358    }
359
360    public void start() throws Exception {
361        if (this.memoryUsage != null) {
362            this.memoryUsage.addUsageListener(peristenceAdapter);
363        }
364        longTermStore.start();
365    }
366
367    public void stop() throws Exception {
368        longTermStore.stop();
369        if (this.memoryUsage != null) {
370            this.memoryUsage.removeUsageListener(peristenceAdapter);
371        }
372    }
373
374    /**
375     * @return Returns the longTermStore.
376     */
377    public MessageStore getLongTermMessageStore() {
378        return longTermStore;
379    }
380
381    /**
382     * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
383     */
384    public void removeAllMessages(ConnectionContext context) throws IOException {
385        peristenceAdapter.checkpoint(true, true);
386        longTermStore.removeAllMessages(context);
387    }
388
389    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
390        throw new IOException("The journal does not support message references.");
391    }
392
393    public String getMessageReference(MessageId identity) throws IOException {
394        throw new IOException("The journal does not support message references.");
395    }
396
397    /**
398     * @return
399     * @throws IOException
400     * @see org.apache.activemq.store.MessageStore#getMessageCount()
401     */
402    public int getMessageCount() throws IOException {
403        peristenceAdapter.checkpoint(true, true);
404        return longTermStore.getMessageCount();
405    }
406
407    public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
408        peristenceAdapter.checkpoint(true, true);
409        longTermStore.recoverNextMessages(maxReturned, listener);
410
411    }
412
413    public void resetBatching() {
414        longTermStore.resetBatching();
415
416    }
417
418    @Override
419    public void setBatch(MessageId messageId) throws Exception {
420        peristenceAdapter.checkpoint(true, true);
421        longTermStore.setBatch(messageId);
422    }
423
424}