001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.sql.SQLException;
021import java.util.Arrays;
022import java.util.ArrayList;
023import java.util.LinkedList;
024import java.util.Map;
025import java.util.TreeMap;
026
027import org.apache.activemq.ActiveMQMessageAudit;
028import org.apache.activemq.broker.ConnectionContext;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.Message;
031import org.apache.activemq.command.MessageAck;
032import org.apache.activemq.command.MessageId;
033import org.apache.activemq.command.XATransactionId;
034import org.apache.activemq.store.AbstractMessageStore;
035import org.apache.activemq.store.IndexListener;
036import org.apache.activemq.store.MessageRecoveryListener;
037import org.apache.activemq.util.ByteSequence;
038import org.apache.activemq.util.ByteSequenceData;
039import org.apache.activemq.util.IOExceptionSupport;
040import org.apache.activemq.wireformat.WireFormat;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 *
046 */
047public class JDBCMessageStore extends AbstractMessageStore {
048
049    class Duration {
050        static final int LIMIT = 100;
051        final long start = System.currentTimeMillis();
052        final String name;
053
054        Duration(String name) {
055            this.name = name;
056        }
057        void end() {
058            end(null);
059        }
060        void end(Object o) {
061            long duration = System.currentTimeMillis() - start;
062
063            if (duration > LIMIT) {
064                System.err.println(name + " took a long time: " + duration + "ms " + o);
065            }
066        }
067    }
068    private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class);
069    protected final WireFormat wireFormat;
070    protected final JDBCAdapter adapter;
071    protected final JDBCPersistenceAdapter persistenceAdapter;
072    protected ActiveMQMessageAudit audit;
073    protected final LinkedList<Long> pendingAdditions = new LinkedList<Long>();
074    protected final TreeMap<Long, Message> rolledBackAcks = new TreeMap<Long, Message>();
075    final long[] perPriorityLastRecovered = new long[10];
076
077    public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException {
078        super(destination);
079        this.persistenceAdapter = persistenceAdapter;
080        this.adapter = adapter;
081        this.wireFormat = wireFormat;
082        this.audit = audit;
083
084        if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) {
085            recordDestinationCreation(destination);
086        }
087        resetBatching();
088    }
089
090    private void recordDestinationCreation(ActiveMQDestination destination) throws IOException {
091        TransactionContext c = persistenceAdapter.getTransactionContext();
092        try {
093            c = persistenceAdapter.getTransactionContext();
094            if (adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(), destination.getQualifiedName()) < 0) {
095                adapter.doRecordDestination(c, destination);
096            }
097        } catch (SQLException e) {
098            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
099            throw IOExceptionSupport.create("Failed to record destination: " + destination + ". Reason: " + e, e);
100        } finally {
101            c.close();
102        }
103    }
104
105    @Override
106    public void addMessage(final ConnectionContext context, final Message message) throws IOException {
107        MessageId messageId = message.getMessageId();
108        if (audit != null && audit.isDuplicate(message)) {
109            if (LOG.isDebugEnabled()) {
110                LOG.debug(destination.getPhysicalName()
111                    + " ignoring duplicated (add) message, already stored: "
112                    + messageId);
113            }
114            return;
115        }
116
117        // if xaXid present - this is a prepare - so we don't yet have an outcome
118        final XATransactionId xaXid =  context != null ? context.getXid() : null;
119
120        // Serialize the Message..
121        byte data[];
122        try {
123            ByteSequence packet = wireFormat.marshal(message);
124            data = ByteSequenceData.toByteArray(packet);
125        } catch (IOException e) {
126            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
127        }
128
129        // Get a connection and insert the message into the DB.
130        TransactionContext c = persistenceAdapter.getTransactionContext(context);
131        long sequenceId;
132        synchronized (pendingAdditions) {
133            sequenceId = persistenceAdapter.getNextSequenceId();
134            final long sequence = sequenceId;
135            message.getMessageId().setEntryLocator(sequence);
136
137            if (xaXid == null) {
138                pendingAdditions.add(sequence);
139
140                c.onCompletion(new Runnable() {
141                    @Override
142                    public void run() {
143                        // jdbc close or jms commit - while futureOrSequenceLong==null ordered
144                        // work will remain pending on the Queue
145                        message.getMessageId().setFutureOrSequenceLong(sequence);
146                    }
147                });
148
149                if (indexListener != null) {
150                    indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
151                        @Override
152                        public void run() {
153                            // cursor add complete
154                            synchronized (pendingAdditions) { pendingAdditions.remove(sequence); }
155                        }
156                    }));
157                } else {
158                    pendingAdditions.remove(sequence);
159                }
160            }
161        }
162        try {
163            adapter.doAddMessage(c, sequenceId, messageId, destination, data, message.getExpiration(),
164                    this.isPrioritizedMessages() ? message.getPriority() : 0, xaXid);
165        } catch (SQLException e) {
166            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
167            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
168        } finally {
169            c.close();
170        }
171        if (xaXid == null) {
172            onAdd(message, sequenceId, message.getPriority());
173        }
174    }
175
176    // jdbc commit order is random with concurrent connections - limit scan to lowest pending
177    private long minPendingSequeunceId() {
178        synchronized (pendingAdditions) {
179            if (!pendingAdditions.isEmpty()) {
180                return pendingAdditions.get(0);
181            } else {
182                // nothing pending, ensure scan is limited to current state
183                return persistenceAdapter.sequenceGenerator.getLastSequenceId() + 1;
184            }
185        }
186    }
187
188    @Override
189    public void updateMessage(Message message) throws IOException {
190        TransactionContext c = persistenceAdapter.getTransactionContext();
191        try {
192            adapter.doUpdateMessage(c, destination, message.getMessageId(), ByteSequenceData.toByteArray(wireFormat.marshal(message)));
193        } catch (SQLException e) {
194            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
195            throw IOExceptionSupport.create("Failed to update message: " + message.getMessageId() + " in container: " + e, e);
196        } finally {
197            c.close();
198        }
199    }
200
201    protected void onAdd(Message message, long sequenceId, byte priority) {}
202
203    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
204        // Get a connection and insert the message into the DB.
205        TransactionContext c = persistenceAdapter.getTransactionContext(context);
206        try {
207            adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef);
208        } catch (SQLException e) {
209            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
210            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
211        } finally {
212            c.close();
213        }
214    }
215
216    @Override
217    public Message getMessage(MessageId messageId) throws IOException {
218        // Get a connection and pull the message out of the DB
219        TransactionContext c = persistenceAdapter.getTransactionContext();
220        try {
221            byte data[] = adapter.doGetMessage(c, messageId);
222            if (data == null) {
223                return null;
224            }
225
226            Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data));
227            return answer;
228        } catch (IOException e) {
229            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
230        } catch (SQLException e) {
231            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
232            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
233        } finally {
234            c.close();
235        }
236    }
237
238    public String getMessageReference(MessageId messageId) throws IOException {
239        long id = messageId.getBrokerSequenceId();
240
241        // Get a connection and pull the message out of the DB
242        TransactionContext c = persistenceAdapter.getTransactionContext();
243        try {
244            return adapter.doGetMessageReference(c, id);
245        } catch (IOException e) {
246            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
247        } catch (SQLException e) {
248            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
249            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
250        } finally {
251            c.close();
252        }
253    }
254
255    @Override
256    public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
257
258        long seq = ack.getLastMessageId().getFutureOrSequenceLong() != null ?
259                (Long) ack.getLastMessageId().getFutureOrSequenceLong() :
260                persistenceAdapter.getStoreSequenceIdForMessageId(context, ack.getLastMessageId(), destination)[0];
261
262        // Get a connection and remove the message from the DB
263        TransactionContext c = persistenceAdapter.getTransactionContext(context);
264        try {
265            adapter.doRemoveMessage(c, seq, context != null ? context.getXid() : null);
266        } catch (SQLException e) {
267            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
268            throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
269        } finally {
270            c.close();
271        }
272    }
273
274    @Override
275    public void recover(final MessageRecoveryListener listener) throws Exception {
276
277        // Get all the Message ids out of the database.
278        TransactionContext c = persistenceAdapter.getTransactionContext();
279        try {
280            c = persistenceAdapter.getTransactionContext();
281            adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
282                @Override
283                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
284                    if (listener.hasSpace()) {
285                        Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
286                        msg.getMessageId().setBrokerSequenceId(sequenceId);
287                        return listener.recoverMessage(msg);
288                    } else {
289                        if (LOG.isTraceEnabled()) {
290                            LOG.trace("Message recovery limit reached for MessageRecoveryListener");
291                        }
292                        return false;
293                    }
294                }
295
296                @Override
297                public boolean recoverMessageReference(String reference) throws Exception {
298                    if (listener.hasSpace()) {
299                        return listener.recoverMessageReference(new MessageId(reference));
300                    } else {
301                        if (LOG.isTraceEnabled()) {
302                            LOG.trace("Message recovery limit reached for MessageRecoveryListener");
303                        }
304                        return false;
305                    }
306                }
307            });
308        } catch (SQLException e) {
309            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
310            throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
311        } finally {
312            c.close();
313        }
314    }
315
316    /**
317     * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
318     */
319    @Override
320    public void removeAllMessages(ConnectionContext context) throws IOException {
321        // Get a connection and remove the message from the DB
322        TransactionContext c = persistenceAdapter.getTransactionContext(context);
323        try {
324            adapter.doRemoveAllMessages(c, destination);
325        } catch (SQLException e) {
326            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
327            throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e);
328        } finally {
329            c.close();
330        }
331    }
332
333    public int getMessageCount() throws IOException {
334        int result = 0;
335        TransactionContext c = persistenceAdapter.getTransactionContext();
336        try {
337
338            result = adapter.doGetMessageCount(c, destination);
339
340        } catch (SQLException e) {
341            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
342            throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
343        } finally {
344            c.close();
345        }
346        return result;
347    }
348
349    /**
350     * @param maxReturned
351     * @param listener
352     * @throws Exception
353     * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
354     *      org.apache.activemq.store.MessageRecoveryListener)
355     */
356    @Override
357    public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
358        TransactionContext c = persistenceAdapter.getTransactionContext();
359        try {
360            if (LOG.isTraceEnabled()) {
361                LOG.trace(this + " recoverNext lastRecovered:" + Arrays.toString(perPriorityLastRecovered) + ", minPending:" + minPendingSequeunceId());
362            }
363
364            maxReturned -= recoverRolledBackAcks(maxReturned, listener);
365
366            adapter.doRecoverNextMessages(c, destination, perPriorityLastRecovered, minPendingSequeunceId(),
367                    maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
368
369                @Override
370                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
371                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
372                        msg.getMessageId().setBrokerSequenceId(sequenceId);
373                        msg.getMessageId().setFutureOrSequenceLong(sequenceId);
374                        msg.getMessageId().setEntryLocator(sequenceId);
375                        listener.recoverMessage(msg);
376                        trackLastRecovered(sequenceId, msg.getPriority());
377                        return true;
378                }
379
380                @Override
381                public boolean recoverMessageReference(String reference) throws Exception {
382                    if (listener.hasSpace()) {
383                        listener.recoverMessageReference(new MessageId(reference));
384                        return true;
385                    }
386                    return false;
387                }
388
389            });
390        } catch (SQLException e) {
391            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
392        } finally {
393            c.close();
394        }
395
396    }
397
398    public void trackRollbackAck(Message message) {
399        synchronized (rolledBackAcks) {
400            rolledBackAcks.put((Long)message.getMessageId().getEntryLocator(), message);
401        }
402    }
403
404    private int recoverRolledBackAcks(int max, MessageRecoveryListener listener) throws Exception {
405        int recovered = 0;
406        ArrayList<Long> toRemove = new ArrayList<Long>();
407        synchronized (rolledBackAcks) {
408            if (!rolledBackAcks.isEmpty()) {
409                for ( Map.Entry<Long,Message> candidate : rolledBackAcks.entrySet()) {
410                    if (candidate.getKey() <= lastRecovered(candidate.getValue().getPriority())) {
411                        listener.recoverMessage(candidate.getValue());
412                        recovered++;
413                        toRemove.add(candidate.getKey());
414                        if (recovered == max) {
415                            break;
416                        }
417                    } else {
418                        toRemove.add(candidate.getKey());
419                    }
420                }
421                for (Long key : toRemove) {
422                    rolledBackAcks.remove(key);
423                }
424            }
425        }
426        return recovered;
427    }
428
429    private long lastRecovered(int priority) {
430        return perPriorityLastRecovered[isPrioritizedMessages() ? priority : 0];
431    }
432
433    private void trackLastRecovered(long sequenceId, int priority) {
434        perPriorityLastRecovered[isPrioritizedMessages() ? priority : 0] = sequenceId;
435    }
436
437    /**
438     * @see org.apache.activemq.store.MessageStore#resetBatching()
439     */
440    @Override
441    public void resetBatching() {
442        if (LOG.isTraceEnabled()) {
443            LOG.trace(this + " resetBatching. last recovered: " + Arrays.toString(perPriorityLastRecovered));
444        }
445        setLastRecovered(-1);
446    }
447
448    private void setLastRecovered(long val) {
449        for (int i=0;i<perPriorityLastRecovered.length;i++) {
450            perPriorityLastRecovered[i] = val;
451        }
452    }
453
454
455    @Override
456    public void setBatch(MessageId messageId) {
457        if (LOG.isTraceEnabled()) {
458            LOG.trace(this + " setBatch: last recovered: " + Arrays.toString(perPriorityLastRecovered));
459        }
460        try {
461            long[] storedValues = persistenceAdapter.getStoreSequenceIdForMessageId(null, messageId, destination);
462            setLastRecovered(storedValues[0]);
463        } catch (IOException ignoredAsAlreadyLogged) {
464            resetBatching();
465        }
466        if (LOG.isTraceEnabled()) {
467            LOG.trace(this + " setBatch: new last recovered: " + Arrays.toString(perPriorityLastRecovered));
468        }
469    }
470
471
472    @Override
473    public void setPrioritizedMessages(boolean prioritizedMessages) {
474        super.setPrioritizedMessages(prioritizedMessages);
475    }
476
477    @Override
478    public String toString() {
479        return destination.getPhysicalName() + ",pendingSize:" + pendingAdditions.size();
480    }
481}