001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.IOException; 020import java.sql.SQLException; 021import java.util.Arrays; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.LinkedHashMap; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.locks.ReentrantReadWriteLock; 029 030import org.apache.activemq.ActiveMQMessageAudit; 031import org.apache.activemq.broker.ConnectionContext; 032import org.apache.activemq.command.ActiveMQDestination; 033import org.apache.activemq.command.ActiveMQTopic; 034import org.apache.activemq.command.Message; 035import org.apache.activemq.command.MessageAck; 036import org.apache.activemq.command.MessageId; 037import org.apache.activemq.command.SubscriptionInfo; 038import org.apache.activemq.store.MessageRecoveryListener; 039import org.apache.activemq.store.TopicMessageStore; 040import org.apache.activemq.util.ByteSequence; 041import org.apache.activemq.util.IOExceptionSupport; 042import org.apache.activemq.wireformat.WireFormat; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * 048 */ 049public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore { 050 051 private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class); 052 private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>(); 053 private Set<String> pendingCompletion = new HashSet<String>(); 054 055 public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE"; 056 private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty( 057 PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10); 058 private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new ReentrantReadWriteLock(); 059 private Map<MessageId, long[]> sequenceIdCache = new LinkedHashMap<MessageId, long[]>() { 060 protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> eldest) { 061 return size() > SEQUENCE_ID_CACHE_SIZE; 062 } 063 }; 064 065 066 public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) throws IOException { 067 super(persistenceAdapter, adapter, wireFormat, topic, audit); 068 } 069 070 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { 071 if (ack != null && ack.isUnmatchedAck()) { 072 if (LOG.isTraceEnabled()) { 073 LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks."); 074 } 075 return; 076 } 077 TransactionContext c = persistenceAdapter.getTransactionContext(context); 078 try { 079 long[] res = getCachedStoreSequenceId(c, destination, messageId); 080 if (this.isPrioritizedMessages()) { 081 adapter.doSetLastAckWithPriority(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]); 082 } else { 083 adapter.doSetLastAck(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]); 084 } 085 if (LOG.isTraceEnabled()) { 086 LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId); 087 } 088 } catch (SQLException e) { 089 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 090 throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e); 091 } finally { 092 c.close(); 093 } 094 } 095 096 public long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException { 097 long[] val = null; 098 sequenceIdCacheSizeLock.readLock().lock(); 099 try { 100 val = sequenceIdCache.get(messageId); 101 } finally { 102 sequenceIdCacheSizeLock.readLock().unlock(); 103 } 104 if (val == null) { 105 val = adapter.getStoreSequenceId(transactionContext, destination, messageId); 106 } 107 return val; 108 } 109 110 /** 111 * @throws Exception 112 */ 113 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception { 114 TransactionContext c = persistenceAdapter.getTransactionContext(); 115 try { 116 adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() { 117 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 118 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); 119 msg.getMessageId().setBrokerSequenceId(sequenceId); 120 return listener.recoverMessage(msg); 121 } 122 123 public boolean recoverMessageReference(String reference) throws Exception { 124 return listener.recoverMessageReference(new MessageId(reference)); 125 } 126 127 }); 128 } catch (SQLException e) { 129 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 130 throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e); 131 } finally { 132 c.close(); 133 } 134 } 135 136 private class LastRecovered implements Iterable<LastRecoveredEntry> { 137 LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10]; 138 LastRecovered() { 139 for (int i=0; i<perPriority.length; i++) { 140 perPriority[i] = new LastRecoveredEntry(i); 141 } 142 } 143 144 public void updateStored(long sequence, int priority) { 145 perPriority[priority].stored = sequence; 146 } 147 148 public LastRecoveredEntry defaultPriority() { 149 return perPriority[0]; 150 } 151 152 public String toString() { 153 return Arrays.deepToString(perPriority); 154 } 155 156 public Iterator<LastRecoveredEntry> iterator() { 157 return new PriorityIterator(); 158 } 159 160 class PriorityIterator implements Iterator<LastRecoveredEntry> { 161 int current = 9; 162 public boolean hasNext() { 163 for (int i=current; i>=0; i--) { 164 if (perPriority[i].hasMessages()) { 165 current = i; 166 return true; 167 } 168 } 169 return false; 170 } 171 172 public LastRecoveredEntry next() { 173 return perPriority[current]; 174 } 175 176 public void remove() { 177 throw new RuntimeException("not implemented"); 178 } 179 } 180 } 181 182 private class LastRecoveredEntry { 183 final int priority; 184 long recovered = 0; 185 long stored = Integer.MAX_VALUE; 186 187 public LastRecoveredEntry(int priority) { 188 this.priority = priority; 189 } 190 191 public String toString() { 192 return priority + "-" + stored + ":" + recovered; 193 } 194 195 public void exhausted() { 196 stored = recovered; 197 } 198 199 public boolean hasMessages() { 200 return stored > recovered; 201 } 202 } 203 204 class LastRecoveredAwareListener implements JDBCMessageRecoveryListener { 205 final MessageRecoveryListener delegate; 206 final int maxMessages; 207 LastRecoveredEntry lastRecovered; 208 int recoveredCount; 209 int recoveredMarker; 210 211 public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) { 212 this.delegate = delegate; 213 this.maxMessages = maxMessages; 214 } 215 216 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 217 if (delegate.hasSpace() && recoveredCount < maxMessages) { 218 Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); 219 msg.getMessageId().setBrokerSequenceId(sequenceId); 220 lastRecovered.recovered = sequenceId; 221 if (delegate.recoverMessage(msg)) { 222 recoveredCount++; 223 return true; 224 } 225 } 226 return false; 227 } 228 229 public boolean recoverMessageReference(String reference) throws Exception { 230 return delegate.recoverMessageReference(new MessageId(reference)); 231 } 232 233 public void setLastRecovered(LastRecoveredEntry lastRecovered) { 234 this.lastRecovered = lastRecovered; 235 recoveredMarker = recoveredCount; 236 } 237 238 public boolean complete() { 239 return !delegate.hasSpace() || recoveredCount == maxMessages; 240 } 241 242 public boolean stalled() { 243 return recoveredMarker == recoveredCount; 244 } 245 } 246 247 public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) 248 throws Exception { 249 //Duration duration = new Duration("recoverNextMessages"); 250 TransactionContext c = persistenceAdapter.getTransactionContext(); 251 252 String key = getSubscriptionKey(clientId, subscriptionName); 253 if (!subscriberLastRecoveredMap.containsKey(key)) { 254 subscriberLastRecoveredMap.put(key, new LastRecovered()); 255 } 256 final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key); 257 LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned); 258 try { 259 if (LOG.isTraceEnabled()) { 260 LOG.trace(this + ", " + key + " existing last recovered: " + lastRecovered); 261 } 262 if (isPrioritizedMessages()) { 263 Iterator<LastRecoveredEntry> it = lastRecovered.iterator(); 264 for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) { 265 LastRecoveredEntry entry = it.next(); 266 recoveredAwareListener.setLastRecovered(entry); 267 //Duration microDuration = new Duration("recoverNextMessages:loop"); 268 adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName, 269 entry.recovered, entry.priority, maxReturned, recoveredAwareListener); 270 //microDuration.end(new String(entry + " recoveredCount:" + recoveredAwareListener.recoveredCount)); 271 if (recoveredAwareListener.stalled()) { 272 if (recoveredAwareListener.complete()) { 273 break; 274 } else { 275 entry.exhausted(); 276 } 277 } 278 } 279 } else { 280 LastRecoveredEntry last = lastRecovered.defaultPriority(); 281 recoveredAwareListener.setLastRecovered(last); 282 adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, 283 last.recovered, 0, maxReturned, recoveredAwareListener); 284 } 285 if (LOG.isTraceEnabled()) { 286 LOG.trace(key + " last recovered: " + lastRecovered); 287 } 288 //duration.end(); 289 } catch (SQLException e) { 290 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 291 } finally { 292 c.close(); 293 } 294 } 295 296 public void resetBatching(String clientId, String subscriptionName) { 297 String key = getSubscriptionKey(clientId, subscriptionName); 298 if (!pendingCompletion.contains(key)) { 299 subscriberLastRecoveredMap.remove(key); 300 } else { 301 LOG.trace(this + ", skip resetBatch during pending completion for: " + key); 302 } 303 } 304 305 public void pendingCompletion(String clientId, String subscriptionName, long sequenceId, byte priority) { 306 final String key = getSubscriptionKey(clientId, subscriptionName); 307 LastRecovered recovered = new LastRecovered(); 308 recovered.perPriority[priority].recovered = sequenceId; 309 subscriberLastRecoveredMap.put(key, recovered); 310 pendingCompletion.add(key); 311 LOG.trace(this + ", pending completion: " + key + ", last: " + recovered); 312 } 313 314 public void complete(String clientId, String subscriptionName) { 315 pendingCompletion.remove(getSubscriptionKey(clientId, subscriptionName)); 316 LOG.trace(this + ", completion for: " + getSubscriptionKey(clientId, subscriptionName)); 317 } 318 319 @Override 320 protected void onAdd(Message message, long sequenceId, byte priority) { 321 // update last recovered state 322 for (LastRecovered last : subscriberLastRecoveredMap.values()) { 323 last.updateStored(sequenceId, priority); 324 } 325 sequenceIdCacheSizeLock.writeLock().lock(); 326 try { 327 sequenceIdCache.put(message.getMessageId(), new long[]{sequenceId, priority}); 328 } finally { 329 sequenceIdCacheSizeLock.writeLock().unlock(); 330 } 331 } 332 333 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 334 TransactionContext c = persistenceAdapter.getTransactionContext(); 335 try { 336 c = persistenceAdapter.getTransactionContext(); 337 adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages()); 338 } catch (SQLException e) { 339 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 340 throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e); 341 } finally { 342 c.close(); 343 } 344 } 345 346 /** 347 * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String, 348 * String) 349 */ 350 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 351 TransactionContext c = persistenceAdapter.getTransactionContext(); 352 try { 353 return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName); 354 } catch (SQLException e) { 355 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 356 throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e); 357 } finally { 358 c.close(); 359 } 360 } 361 362 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 363 TransactionContext c = persistenceAdapter.getTransactionContext(); 364 try { 365 adapter.doDeleteSubscription(c, destination, clientId, subscriptionName); 366 } catch (SQLException e) { 367 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 368 throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e); 369 } finally { 370 c.close(); 371 resetBatching(clientId, subscriptionName); 372 } 373 } 374 375 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 376 TransactionContext c = persistenceAdapter.getTransactionContext(); 377 try { 378 return adapter.doGetAllSubscriptions(c, destination); 379 } catch (SQLException e) { 380 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 381 throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e); 382 } finally { 383 c.close(); 384 } 385 } 386 387 public int getMessageCount(String clientId, String subscriberName) throws IOException { 388 //Duration duration = new Duration("getMessageCount"); 389 int result = 0; 390 TransactionContext c = persistenceAdapter.getTransactionContext(); 391 try { 392 result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages()); 393 } catch (SQLException e) { 394 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 395 throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e); 396 } finally { 397 c.close(); 398 } 399 if (LOG.isTraceEnabled()) { 400 LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result); 401 } 402 //duration.end(); 403 return result; 404 } 405 406 protected String getSubscriptionKey(String clientId, String subscriberName) { 407 String result = clientId + ":"; 408 result += subscriberName != null ? subscriberName : "NOT_SET"; 409 return result; 410 } 411 412}