001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.sql.SQLException;
021import java.util.Arrays;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.LinkedHashMap;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.locks.ReentrantReadWriteLock;
029
030import org.apache.activemq.ActiveMQMessageAudit;
031import org.apache.activemq.broker.ConnectionContext;
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.ActiveMQTopic;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.MessageAck;
036import org.apache.activemq.command.MessageId;
037import org.apache.activemq.command.SubscriptionInfo;
038import org.apache.activemq.store.MessageRecoveryListener;
039import org.apache.activemq.store.TopicMessageStore;
040import org.apache.activemq.util.ByteSequence;
041import org.apache.activemq.util.IOExceptionSupport;
042import org.apache.activemq.wireformat.WireFormat;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * 
048 */
049public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
050
051    private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
052    private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
053    private Set<String> pendingCompletion = new HashSet<String>();
054
055    public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE";
056    private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty(
057               PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10);
058    private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock();
059    private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId, long[]>() {
060         protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) {
061           return size() > SEQUENCE_ID_CACHE_SIZE;
062        }
063    };
064
065
066    public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) throws IOException {
067        super(persistenceAdapter, adapter, wireFormat, topic, audit);
068    }
069
070    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
071        if (ack != null && ack.isUnmatchedAck()) {
072            if (LOG.isTraceEnabled()) {
073                LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
074            }
075            return;
076        }
077        TransactionContext c = persistenceAdapter.getTransactionContext(context);
078        try {
079            long[] res = getCachedStoreSequenceId(c, destination, messageId);
080            if (this.isPrioritizedMessages()) {
081                adapter.doSetLastAckWithPriority(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]);
082            } else {
083                adapter.doSetLastAck(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]);
084            }
085            if (LOG.isTraceEnabled()) {
086                LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId);
087            }
088        } catch (SQLException e) {
089            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
090            throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
091        } finally {
092            c.close();
093        }
094    }
095
096    public long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException {
097        long[] val = null;
098        sequenceIdCacheSizeLock.readLock().lock();
099        try {
100            val = sequenceIdCache.get(messageId);
101        } finally {
102            sequenceIdCacheSizeLock.readLock().unlock();
103        }
104        if (val == null) {
105            val = adapter.getStoreSequenceId(transactionContext, destination, messageId);
106        }
107        return val;
108    }
109
110    /**
111     * @throws Exception
112     */
113    public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
114        TransactionContext c = persistenceAdapter.getTransactionContext();
115        try {
116            adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
117                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
118                    Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
119                    msg.getMessageId().setBrokerSequenceId(sequenceId);
120                    return listener.recoverMessage(msg);
121                }
122
123                public boolean recoverMessageReference(String reference) throws Exception {
124                    return listener.recoverMessageReference(new MessageId(reference));
125                }
126
127            });
128        } catch (SQLException e) {
129            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
130            throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
131        } finally {
132            c.close();
133        }
134    }
135
136    private class LastRecovered implements Iterable<LastRecoveredEntry> {
137        LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];
138        LastRecovered() {
139            for (int i=0; i<perPriority.length; i++) {
140                perPriority[i] = new LastRecoveredEntry(i);
141            }
142        }
143
144        public void updateStored(long sequence, int priority) {
145            perPriority[priority].stored = sequence;
146        }
147
148        public LastRecoveredEntry defaultPriority() {
149            return perPriority[0];
150        }
151
152        public String toString() {
153            return Arrays.deepToString(perPriority);
154        }
155
156        public Iterator<LastRecoveredEntry> iterator() {
157            return new PriorityIterator();
158        }
159
160        class PriorityIterator implements Iterator<LastRecoveredEntry> {
161            int current = 9;
162            public boolean hasNext() {
163                for (int i=current; i>=0; i--) {
164                    if (perPriority[i].hasMessages()) {
165                        current = i;
166                        return true;
167                    }
168                }
169                return false;
170            }
171
172            public LastRecoveredEntry next() {
173                return perPriority[current];
174            }
175
176            public void remove() {
177                throw new RuntimeException("not implemented");
178            }
179        }
180    }
181
182    private class LastRecoveredEntry {
183        final int priority;
184        long recovered = 0;
185        long stored = Integer.MAX_VALUE;
186
187        public LastRecoveredEntry(int priority) {
188            this.priority = priority;
189        }
190
191        public String toString() {
192            return priority + "-" + stored + ":" + recovered;
193        }
194
195        public void exhausted() {
196            stored = recovered;
197        }
198
199        public boolean hasMessages() {
200            return stored > recovered;
201        }
202    }
203
204    class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
205        final MessageRecoveryListener delegate;
206        final int maxMessages;
207        LastRecoveredEntry lastRecovered;
208        int recoveredCount;
209        int recoveredMarker;
210
211        public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) {
212            this.delegate = delegate;
213            this.maxMessages = maxMessages;
214        }
215
216        public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
217            if (delegate.hasSpace() && recoveredCount < maxMessages) {
218                Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
219                msg.getMessageId().setBrokerSequenceId(sequenceId);
220                lastRecovered.recovered = sequenceId;
221                if (delegate.recoverMessage(msg)) {
222                    recoveredCount++;
223                    return true;
224                }
225            }
226            return false;
227        }
228
229        public boolean recoverMessageReference(String reference) throws Exception {
230            return delegate.recoverMessageReference(new MessageId(reference));
231        }
232
233        public void setLastRecovered(LastRecoveredEntry lastRecovered) {
234            this.lastRecovered = lastRecovered;
235            recoveredMarker = recoveredCount;
236        }
237
238        public boolean complete() {
239            return  !delegate.hasSpace() || recoveredCount == maxMessages;
240        }
241
242        public boolean stalled() {
243            return recoveredMarker == recoveredCount;
244        }
245    }
246
247    public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
248            throws Exception {
249        //Duration duration = new Duration("recoverNextMessages");
250        TransactionContext c = persistenceAdapter.getTransactionContext();
251
252        String key = getSubscriptionKey(clientId, subscriptionName);
253        if (!subscriberLastRecoveredMap.containsKey(key)) {
254           subscriberLastRecoveredMap.put(key, new LastRecovered());
255        }
256        final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);        
257        LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
258        try {
259            if (LOG.isTraceEnabled()) {
260                LOG.trace(this + ", " + key + " existing last recovered: " + lastRecovered);
261            }
262            if (isPrioritizedMessages()) {
263                Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
264                for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) {
265                    LastRecoveredEntry entry = it.next();
266                    recoveredAwareListener.setLastRecovered(entry);
267                    //Duration microDuration = new Duration("recoverNextMessages:loop");
268                    adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
269                        entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
270                    //microDuration.end(new String(entry + " recoveredCount:" + recoveredAwareListener.recoveredCount));
271                    if (recoveredAwareListener.stalled()) {
272                        if (recoveredAwareListener.complete()) {
273                            break;
274                        } else {
275                            entry.exhausted();
276                        }
277                    }
278                }
279            } else {
280                LastRecoveredEntry last = lastRecovered.defaultPriority();
281                recoveredAwareListener.setLastRecovered(last);
282                adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
283                        last.recovered, 0, maxReturned, recoveredAwareListener);
284            }
285            if (LOG.isTraceEnabled()) {
286                LOG.trace(key + " last recovered: " + lastRecovered);
287            }
288            //duration.end();
289        } catch (SQLException e) {
290            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
291        } finally {
292            c.close();
293        }
294    }
295
296    public void resetBatching(String clientId, String subscriptionName) {
297        String key = getSubscriptionKey(clientId, subscriptionName);
298        if (!pendingCompletion.contains(key))  {
299            subscriberLastRecoveredMap.remove(key);
300        } else {
301            LOG.trace(this +  ", skip resetBatch during pending completion for: " + key);
302        }
303    }
304
305    public void pendingCompletion(String clientId, String subscriptionName, long sequenceId, byte priority) {
306        final String key = getSubscriptionKey(clientId, subscriptionName);
307        LastRecovered recovered = new LastRecovered();
308        recovered.perPriority[priority].recovered = sequenceId;
309        subscriberLastRecoveredMap.put(key, recovered);
310        pendingCompletion.add(key);
311        LOG.trace(this + ", pending completion: " + key + ", last: " + recovered);
312    }
313
314    public void complete(String clientId, String subscriptionName) {
315        pendingCompletion.remove(getSubscriptionKey(clientId, subscriptionName));
316        LOG.trace(this + ", completion for: " + getSubscriptionKey(clientId, subscriptionName));
317    }
318
319    @Override
320    protected void onAdd(Message message, long sequenceId, byte priority) {
321        // update last recovered state
322        for (LastRecovered last : subscriberLastRecoveredMap.values()) {
323            last.updateStored(sequenceId, priority);
324        }
325        sequenceIdCacheSizeLock.writeLock().lock();
326        try {
327            sequenceIdCache.put(message.getMessageId(), new long[]{sequenceId, priority});
328        } finally {
329            sequenceIdCacheSizeLock.writeLock().unlock();
330        }
331    }
332
333    public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
334        TransactionContext c = persistenceAdapter.getTransactionContext();
335        try {
336            c = persistenceAdapter.getTransactionContext();
337            adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages());
338        } catch (SQLException e) {
339            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
340            throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
341        } finally {
342            c.close();
343        }
344    }
345
346    /**
347     * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
348     *      String)
349     */
350    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
351        TransactionContext c = persistenceAdapter.getTransactionContext();
352        try {
353            return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
354        } catch (SQLException e) {
355            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
356            throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
357        } finally {
358            c.close();
359        }
360    }
361
362    public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
363        TransactionContext c = persistenceAdapter.getTransactionContext();
364        try {
365            adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
366        } catch (SQLException e) {
367            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
368            throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
369        } finally {
370            c.close();
371            resetBatching(clientId, subscriptionName);
372        }
373    }
374
375    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
376        TransactionContext c = persistenceAdapter.getTransactionContext();
377        try {
378            return adapter.doGetAllSubscriptions(c, destination);
379        } catch (SQLException e) {
380            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
381            throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
382        } finally {
383            c.close();
384        }
385    }
386
387    public int getMessageCount(String clientId, String subscriberName) throws IOException {
388        //Duration duration = new Duration("getMessageCount");
389        int result = 0;
390        TransactionContext c = persistenceAdapter.getTransactionContext();
391        try {
392            result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
393        } catch (SQLException e) {
394            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
395            throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
396        } finally {
397            c.close();
398        }
399        if (LOG.isTraceEnabled()) {
400            LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
401        }
402        //duration.end();
403        return result;
404    }
405
406    protected String getSubscriptionKey(String clientId, String subscriberName) {
407        String result = clientId + ":";
408        result += subscriberName != null ? subscriberName : "NOT_SET";
409        return result;
410    }
411
412}