001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.memory;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.Iterator;
023import java.util.LinkedHashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.command.Message;
031import org.apache.activemq.command.MessageAck;
032import org.apache.activemq.command.MessageId;
033import org.apache.activemq.command.TransactionId;
034import org.apache.activemq.command.XATransactionId;
035import org.apache.activemq.store.InlineListenableFuture;
036import org.apache.activemq.store.ListenableFuture;
037import org.apache.activemq.store.MessageStore;
038import org.apache.activemq.store.PersistenceAdapter;
039import org.apache.activemq.store.ProxyMessageStore;
040import org.apache.activemq.store.ProxyTopicMessageStore;
041import org.apache.activemq.store.TopicMessageStore;
042import org.apache.activemq.store.TransactionRecoveryListener;
043import org.apache.activemq.store.TransactionStore;
044
045/**
046 * Provides a TransactionStore implementation that can create transaction aware
047 * MessageStore objects from non transaction aware MessageStore objects.
048 *
049 *
050 */
051public class MemoryTransactionStore implements TransactionStore {
052
053    protected ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
054    protected Map<TransactionId, Tx> preparedTransactions = Collections.synchronizedMap(new LinkedHashMap<TransactionId, Tx>());
055    protected final PersistenceAdapter persistenceAdapter;
056
057    private boolean doingRecover;
058
059    public class Tx {
060
061        public List<AddMessageCommand> messages = Collections.synchronizedList(new ArrayList<AddMessageCommand>());
062
063        public final List<RemoveMessageCommand> acks = Collections.synchronizedList(new ArrayList<RemoveMessageCommand>());
064
065        public void add(AddMessageCommand msg) {
066            messages.add(msg);
067        }
068
069        public void add(RemoveMessageCommand ack) {
070            acks.add(ack);
071        }
072
073        public Message[] getMessages() {
074            Message rc[] = new Message[messages.size()];
075            int count = 0;
076            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
077                AddMessageCommand cmd = iter.next();
078                rc[count++] = cmd.getMessage();
079            }
080            return rc;
081        }
082
083        public MessageAck[] getAcks() {
084            MessageAck rc[] = new MessageAck[acks.size()];
085            int count = 0;
086            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
087                RemoveMessageCommand cmd = iter.next();
088                rc[count++] = cmd.getMessageAck();
089            }
090            return rc;
091        }
092
093        /**
094         * @throws IOException
095         */
096        public void commit() throws IOException {
097            ConnectionContext ctx = new ConnectionContext();
098            persistenceAdapter.beginTransaction(ctx);
099            try {
100
101                // Do all the message adds.
102                for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
103                    AddMessageCommand cmd = iter.next();
104                    cmd.run(ctx);
105                }
106                // And removes..
107                for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
108                    RemoveMessageCommand cmd = iter.next();
109                    cmd.run(ctx);
110                }
111
112                persistenceAdapter.commitTransaction(ctx);
113
114            } catch (IOException e) {
115                persistenceAdapter.rollbackTransaction(ctx);
116                throw e;
117            }
118        }
119    }
120
121    public interface AddMessageCommand {
122        Message getMessage();
123
124        MessageStore getMessageStore();
125
126        void run(ConnectionContext context) throws IOException;
127
128        void setMessageStore(MessageStore messageStore);
129    }
130
131    public interface RemoveMessageCommand {
132        MessageAck getMessageAck();
133
134        void run(ConnectionContext context) throws IOException;
135
136        MessageStore getMessageStore();
137    }
138
139    public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
140        this.persistenceAdapter=persistenceAdapter;
141    }
142
143    public MessageStore proxy(MessageStore messageStore) {
144        ProxyMessageStore proxyMessageStore = new ProxyMessageStore(messageStore) {
145            @Override
146            public void addMessage(ConnectionContext context, final Message send) throws IOException {
147                MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
148            }
149
150            @Override
151            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
152                MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
153            }
154
155            @Override
156            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
157                MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
158                return new InlineListenableFuture();
159             }
160
161            @Override
162            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
163                MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
164                return new InlineListenableFuture();
165             }
166
167            @Override
168            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
169                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
170            }
171
172            @Override
173            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
174                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
175            }
176        };
177        onProxyQueueStore(proxyMessageStore);
178        return proxyMessageStore;
179    }
180
181    protected void onProxyQueueStore(ProxyMessageStore proxyMessageStore) {
182    }
183
184    public TopicMessageStore proxy(TopicMessageStore messageStore) {
185        ProxyTopicMessageStore proxyTopicMessageStore = new ProxyTopicMessageStore(messageStore) {
186            @Override
187            public void addMessage(ConnectionContext context, final Message send) throws IOException {
188                MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
189            }
190
191            @Override
192            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
193                MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
194            }
195
196            @Override
197            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
198                MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
199                return new InlineListenableFuture();
200             }
201
202            @Override
203            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
204                MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
205                return new InlineListenableFuture();
206             }
207
208            @Override
209            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
210                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
211            }
212
213            @Override
214            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
215                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
216            }
217
218            @Override
219            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
220                            MessageId messageId, MessageAck ack) throws IOException {
221                MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId,
222                        subscriptionName, messageId, ack);
223            }
224        };
225        onProxyTopicStore(proxyTopicMessageStore);
226        return proxyTopicMessageStore;
227    }
228
229    protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
230    }
231
232    /**
233     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
234     */
235    @Override
236    public void prepare(TransactionId txid) throws IOException {
237        Tx tx = inflightTransactions.remove(txid);
238        if (tx == null) {
239            return;
240        }
241        preparedTransactions.put(txid, tx);
242    }
243
244    public Tx getTx(Object txid) {
245        Tx tx = inflightTransactions.get(txid);
246        if (tx == null) {
247            synchronized (inflightTransactions) {
248                tx = inflightTransactions.get(txid);
249                if ( tx == null) {
250                    tx = new Tx();
251                    inflightTransactions.put(txid, tx);
252                }
253            }
254        }
255        return tx;
256    }
257
258    public Tx getPreparedTx(TransactionId txid) {
259        Tx tx = preparedTransactions.get(txid);
260        if (tx == null) {
261            tx = new Tx();
262            preparedTransactions.put(txid, tx);
263        }
264        return tx;
265    }
266
267    @Override
268    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
269        if (preCommit != null) {
270            preCommit.run();
271        }
272        Tx tx;
273        if (wasPrepared) {
274            tx = preparedTransactions.get(txid);
275        } else {
276            tx = inflightTransactions.remove(txid);
277        }
278
279        if (tx != null) {
280            tx.commit();
281        }
282        if (wasPrepared) {
283            preparedTransactions.remove(txid);
284        }
285        if (postCommit != null) {
286            postCommit.run();
287        }
288    }
289
290    /**
291     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
292     */
293    @Override
294    public void rollback(TransactionId txid) throws IOException {
295        preparedTransactions.remove(txid);
296        inflightTransactions.remove(txid);
297    }
298
299    @Override
300    public void start() throws Exception {
301    }
302
303    @Override
304    public void stop() throws Exception {
305    }
306
307    @Override
308    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
309        // All the inflight transactions get rolled back..
310        inflightTransactions.clear();
311        this.doingRecover = true;
312        try {
313            for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
314                Object txid = iter.next();
315                Tx tx = preparedTransactions.get(txid);
316                listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
317                onRecovered(tx);
318            }
319        } finally {
320            this.doingRecover = false;
321        }
322    }
323
324    protected void onRecovered(Tx tx) {
325    }
326
327    /**
328     * @param message
329     * @throws IOException
330     */
331    void addMessage(final ConnectionContext context, final MessageStore destination, final Message message) throws IOException {
332
333        if (doingRecover) {
334            return;
335        }
336
337        if (message.getTransactionId() != null) {
338            Tx tx = getTx(message.getTransactionId());
339            tx.add(new AddMessageCommand() {
340                MessageStore messageStore = destination;
341                @Override
342                public Message getMessage() {
343                    return message;
344                }
345
346                @Override
347                public MessageStore getMessageStore() {
348                    return destination;
349                }
350
351                @Override
352                public void run(ConnectionContext ctx) throws IOException {
353                    destination.addMessage(ctx, message);
354                }
355
356                @Override
357                public void setMessageStore(MessageStore messageStore) {
358                    this.messageStore = messageStore;
359                }
360
361            });
362        } else {
363            destination.addMessage(context, message);
364        }
365    }
366
367    /**
368     * @param ack
369     * @throws IOException
370     */
371    final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
372        if (doingRecover) {
373            return;
374        }
375
376        if (ack.isInTransaction()) {
377            Tx tx = getTx(ack.getTransactionId());
378            tx.add(new RemoveMessageCommand() {
379                @Override
380                public MessageAck getMessageAck() {
381                    return ack;
382                }
383
384                @Override
385                public void run(ConnectionContext ctx) throws IOException {
386                    destination.removeMessage(ctx, ack);
387                }
388
389                @Override
390                public MessageStore getMessageStore() {
391                    return destination;
392                }
393            });
394        } else {
395            destination.removeMessage(null, ack);
396        }
397    }
398
399    public void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
400                           final MessageId messageId, final MessageAck ack) throws IOException {
401        if (doingRecover) {
402            return;
403        }
404
405        if (ack.isInTransaction()) {
406            Tx tx = getTx(ack.getTransactionId());
407            tx.add(new RemoveMessageCommand() {
408                @Override
409                public MessageAck getMessageAck() {
410                    return ack;
411                }
412
413                @Override
414                public void run(ConnectionContext ctx) throws IOException {
415                    destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
416                }
417
418                @Override
419                public MessageStore getMessageStore() {
420                    return destination;
421                }
422            });
423        } else {
424            destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
425        }
426    }
427
428
429    public void delete() {
430        inflightTransactions.clear();
431        preparedTransactions.clear();
432        doingRecover = false;
433    }
434
435}