001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.IOException; 020import java.sql.SQLException; 021import java.util.Arrays; 022import java.util.ArrayList; 023import java.util.LinkedList; 024import java.util.Map; 025import java.util.TreeMap; 026 027import org.apache.activemq.ActiveMQMessageAudit; 028import org.apache.activemq.broker.ConnectionContext; 029import org.apache.activemq.command.ActiveMQDestination; 030import org.apache.activemq.command.Message; 031import org.apache.activemq.command.MessageAck; 032import org.apache.activemq.command.MessageId; 033import org.apache.activemq.command.XATransactionId; 034import org.apache.activemq.store.AbstractMessageStore; 035import org.apache.activemq.store.IndexListener; 036import org.apache.activemq.store.MessageRecoveryListener; 037import org.apache.activemq.util.ByteSequence; 038import org.apache.activemq.util.ByteSequenceData; 039import org.apache.activemq.util.IOExceptionSupport; 040import org.apache.activemq.wireformat.WireFormat; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * 046 */ 047public class JDBCMessageStore extends AbstractMessageStore { 048 049 class Duration { 050 static final int LIMIT = 100; 051 final long start = System.currentTimeMillis(); 052 final String name; 053 054 Duration(String name) { 055 this.name = name; 056 } 057 void end() { 058 end(null); 059 } 060 void end(Object o) { 061 long duration = System.currentTimeMillis() - start; 062 063 if (duration > LIMIT) { 064 System.err.println(name + " took a long time: " + duration + "ms " + o); 065 } 066 } 067 } 068 private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class); 069 protected final WireFormat wireFormat; 070 protected final JDBCAdapter adapter; 071 protected final JDBCPersistenceAdapter persistenceAdapter; 072 protected ActiveMQMessageAudit audit; 073 protected final LinkedList<Long> pendingAdditions = new LinkedList<Long>(); 074 protected final TreeMap<Long, Message> rolledBackAcks = new TreeMap<Long, Message>(); 075 final long[] perPriorityLastRecovered = new long[10]; 076 077 public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException { 078 super(destination); 079 this.persistenceAdapter = persistenceAdapter; 080 this.adapter = adapter; 081 this.wireFormat = wireFormat; 082 this.audit = audit; 083 084 if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) { 085 recordDestinationCreation(destination); 086 } 087 resetBatching(); 088 } 089 090 private void recordDestinationCreation(ActiveMQDestination destination) throws IOException { 091 TransactionContext c = persistenceAdapter.getTransactionContext(); 092 try { 093 c = persistenceAdapter.getTransactionContext(); 094 if (adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(), destination.getQualifiedName()) < 0) { 095 adapter.doRecordDestination(c, destination); 096 } 097 } catch (SQLException e) { 098 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 099 throw IOExceptionSupport.create("Failed to record destination: " + destination + ". Reason: " + e, e); 100 } finally { 101 c.close(); 102 } 103 } 104 105 @Override 106 public void addMessage(final ConnectionContext context, final Message message) throws IOException { 107 MessageId messageId = message.getMessageId(); 108 if (audit != null && audit.isDuplicate(message)) { 109 if (LOG.isDebugEnabled()) { 110 LOG.debug(destination.getPhysicalName() 111 + " ignoring duplicated (add) message, already stored: " 112 + messageId); 113 } 114 return; 115 } 116 117 // if xaXid present - this is a prepare - so we don't yet have an outcome 118 final XATransactionId xaXid = context != null ? context.getXid() : null; 119 120 // Serialize the Message.. 121 byte data[]; 122 try { 123 ByteSequence packet = wireFormat.marshal(message); 124 data = ByteSequenceData.toByteArray(packet); 125 } catch (IOException e) { 126 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 127 } 128 129 // Get a connection and insert the message into the DB. 130 TransactionContext c = persistenceAdapter.getTransactionContext(context); 131 long sequenceId; 132 synchronized (pendingAdditions) { 133 sequenceId = persistenceAdapter.getNextSequenceId(); 134 final long sequence = sequenceId; 135 message.getMessageId().setEntryLocator(sequence); 136 137 if (xaXid == null) { 138 pendingAdditions.add(sequence); 139 140 c.onCompletion(new Runnable() { 141 @Override 142 public void run() { 143 // jdbc close or jms commit - while futureOrSequenceLong==null ordered 144 // work will remain pending on the Queue 145 message.getMessageId().setFutureOrSequenceLong(sequence); 146 } 147 }); 148 149 if (indexListener != null) { 150 indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { 151 @Override 152 public void run() { 153 // cursor add complete 154 synchronized (pendingAdditions) { pendingAdditions.remove(sequence); } 155 } 156 })); 157 } else { 158 pendingAdditions.remove(sequence); 159 } 160 } 161 } 162 try { 163 adapter.doAddMessage(c, sequenceId, messageId, destination, data, message.getExpiration(), 164 this.isPrioritizedMessages() ? message.getPriority() : 0, xaXid); 165 } catch (SQLException e) { 166 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 167 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 168 } finally { 169 c.close(); 170 } 171 if (xaXid == null) { 172 onAdd(message, sequenceId, message.getPriority()); 173 } 174 } 175 176 // jdbc commit order is random with concurrent connections - limit scan to lowest pending 177 private long minPendingSequeunceId() { 178 synchronized (pendingAdditions) { 179 if (!pendingAdditions.isEmpty()) { 180 return pendingAdditions.get(0); 181 } else { 182 // nothing pending, ensure scan is limited to current state 183 return persistenceAdapter.sequenceGenerator.getLastSequenceId() + 1; 184 } 185 } 186 } 187 188 @Override 189 public void updateMessage(Message message) throws IOException { 190 TransactionContext c = persistenceAdapter.getTransactionContext(); 191 try { 192 adapter.doUpdateMessage(c, destination, message.getMessageId(), ByteSequenceData.toByteArray(wireFormat.marshal(message))); 193 } catch (SQLException e) { 194 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 195 throw IOExceptionSupport.create("Failed to update message: " + message.getMessageId() + " in container: " + e, e); 196 } finally { 197 c.close(); 198 } 199 } 200 201 protected void onAdd(Message message, long sequenceId, byte priority) {} 202 203 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { 204 // Get a connection and insert the message into the DB. 205 TransactionContext c = persistenceAdapter.getTransactionContext(context); 206 try { 207 adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef); 208 } catch (SQLException e) { 209 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 210 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 211 } finally { 212 c.close(); 213 } 214 } 215 216 @Override 217 public Message getMessage(MessageId messageId) throws IOException { 218 // Get a connection and pull the message out of the DB 219 TransactionContext c = persistenceAdapter.getTransactionContext(); 220 try { 221 byte data[] = adapter.doGetMessage(c, messageId); 222 if (data == null) { 223 return null; 224 } 225 226 Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data)); 227 return answer; 228 } catch (IOException e) { 229 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 230 } catch (SQLException e) { 231 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 232 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 233 } finally { 234 c.close(); 235 } 236 } 237 238 public String getMessageReference(MessageId messageId) throws IOException { 239 long id = messageId.getBrokerSequenceId(); 240 241 // Get a connection and pull the message out of the DB 242 TransactionContext c = persistenceAdapter.getTransactionContext(); 243 try { 244 return adapter.doGetMessageReference(c, id); 245 } catch (IOException e) { 246 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 247 } catch (SQLException e) { 248 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 249 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 250 } finally { 251 c.close(); 252 } 253 } 254 255 @Override 256 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 257 258 long seq = ack.getLastMessageId().getFutureOrSequenceLong() != null ? 259 (Long) ack.getLastMessageId().getFutureOrSequenceLong() : 260 persistenceAdapter.getStoreSequenceIdForMessageId(context, ack.getLastMessageId(), destination)[0]; 261 262 // Get a connection and remove the message from the DB 263 TransactionContext c = persistenceAdapter.getTransactionContext(context); 264 try { 265 adapter.doRemoveMessage(c, seq, context != null ? context.getXid() : null); 266 } catch (SQLException e) { 267 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 268 throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e); 269 } finally { 270 c.close(); 271 } 272 } 273 274 @Override 275 public void recover(final MessageRecoveryListener listener) throws Exception { 276 277 // Get all the Message ids out of the database. 278 TransactionContext c = persistenceAdapter.getTransactionContext(); 279 try { 280 c = persistenceAdapter.getTransactionContext(); 281 adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() { 282 @Override 283 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 284 if (listener.hasSpace()) { 285 Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); 286 msg.getMessageId().setBrokerSequenceId(sequenceId); 287 return listener.recoverMessage(msg); 288 } else { 289 if (LOG.isTraceEnabled()) { 290 LOG.trace("Message recovery limit reached for MessageRecoveryListener"); 291 } 292 return false; 293 } 294 } 295 296 @Override 297 public boolean recoverMessageReference(String reference) throws Exception { 298 if (listener.hasSpace()) { 299 return listener.recoverMessageReference(new MessageId(reference)); 300 } else { 301 if (LOG.isTraceEnabled()) { 302 LOG.trace("Message recovery limit reached for MessageRecoveryListener"); 303 } 304 return false; 305 } 306 } 307 }); 308 } catch (SQLException e) { 309 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 310 throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e); 311 } finally { 312 c.close(); 313 } 314 } 315 316 /** 317 * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) 318 */ 319 @Override 320 public void removeAllMessages(ConnectionContext context) throws IOException { 321 // Get a connection and remove the message from the DB 322 TransactionContext c = persistenceAdapter.getTransactionContext(context); 323 try { 324 adapter.doRemoveAllMessages(c, destination); 325 } catch (SQLException e) { 326 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 327 throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e); 328 } finally { 329 c.close(); 330 } 331 } 332 333 public int getMessageCount() throws IOException { 334 int result = 0; 335 TransactionContext c = persistenceAdapter.getTransactionContext(); 336 try { 337 338 result = adapter.doGetMessageCount(c, destination); 339 340 } catch (SQLException e) { 341 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 342 throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e); 343 } finally { 344 c.close(); 345 } 346 return result; 347 } 348 349 /** 350 * @param maxReturned 351 * @param listener 352 * @throws Exception 353 * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int, 354 * org.apache.activemq.store.MessageRecoveryListener) 355 */ 356 @Override 357 public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception { 358 TransactionContext c = persistenceAdapter.getTransactionContext(); 359 try { 360 if (LOG.isTraceEnabled()) { 361 LOG.trace(this + " recoverNext lastRecovered:" + Arrays.toString(perPriorityLastRecovered) + ", minPending:" + minPendingSequeunceId()); 362 } 363 364 maxReturned -= recoverRolledBackAcks(maxReturned, listener); 365 366 adapter.doRecoverNextMessages(c, destination, perPriorityLastRecovered, minPendingSequeunceId(), 367 maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() { 368 369 @Override 370 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 371 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); 372 msg.getMessageId().setBrokerSequenceId(sequenceId); 373 msg.getMessageId().setFutureOrSequenceLong(sequenceId); 374 msg.getMessageId().setEntryLocator(sequenceId); 375 listener.recoverMessage(msg); 376 trackLastRecovered(sequenceId, msg.getPriority()); 377 return true; 378 } 379 380 @Override 381 public boolean recoverMessageReference(String reference) throws Exception { 382 if (listener.hasSpace()) { 383 listener.recoverMessageReference(new MessageId(reference)); 384 return true; 385 } 386 return false; 387 } 388 389 }); 390 } catch (SQLException e) { 391 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 392 } finally { 393 c.close(); 394 } 395 396 } 397 398 public void trackRollbackAck(Message message) { 399 synchronized (rolledBackAcks) { 400 rolledBackAcks.put((Long)message.getMessageId().getEntryLocator(), message); 401 } 402 } 403 404 private int recoverRolledBackAcks(int max, MessageRecoveryListener listener) throws Exception { 405 int recovered = 0; 406 ArrayList<Long> toRemove = new ArrayList<Long>(); 407 synchronized (rolledBackAcks) { 408 if (!rolledBackAcks.isEmpty()) { 409 for ( Map.Entry<Long,Message> candidate : rolledBackAcks.entrySet()) { 410 if (candidate.getKey() <= lastRecovered(candidate.getValue().getPriority())) { 411 listener.recoverMessage(candidate.getValue()); 412 recovered++; 413 toRemove.add(candidate.getKey()); 414 if (recovered == max) { 415 break; 416 } 417 } else { 418 toRemove.add(candidate.getKey()); 419 } 420 } 421 for (Long key : toRemove) { 422 rolledBackAcks.remove(key); 423 } 424 } 425 } 426 return recovered; 427 } 428 429 private long lastRecovered(int priority) { 430 return perPriorityLastRecovered[isPrioritizedMessages() ? priority : 0]; 431 } 432 433 private void trackLastRecovered(long sequenceId, int priority) { 434 perPriorityLastRecovered[isPrioritizedMessages() ? priority : 0] = sequenceId; 435 } 436 437 /** 438 * @see org.apache.activemq.store.MessageStore#resetBatching() 439 */ 440 @Override 441 public void resetBatching() { 442 if (LOG.isTraceEnabled()) { 443 LOG.trace(this + " resetBatching. last recovered: " + Arrays.toString(perPriorityLastRecovered)); 444 } 445 setLastRecovered(-1); 446 } 447 448 private void setLastRecovered(long val) { 449 for (int i=0;i<perPriorityLastRecovered.length;i++) { 450 perPriorityLastRecovered[i] = val; 451 } 452 } 453 454 455 @Override 456 public void setBatch(MessageId messageId) { 457 if (LOG.isTraceEnabled()) { 458 LOG.trace(this + " setBatch: last recovered: " + Arrays.toString(perPriorityLastRecovered)); 459 } 460 try { 461 long[] storedValues = persistenceAdapter.getStoreSequenceIdForMessageId(null, messageId, destination); 462 setLastRecovered(storedValues[0]); 463 } catch (IOException ignoredAsAlreadyLogged) { 464 resetBatching(); 465 } 466 if (LOG.isTraceEnabled()) { 467 LOG.trace(this + " setBatch: new last recovered: " + Arrays.toString(perPriorityLastRecovered)); 468 } 469 } 470 471 472 @Override 473 public void setPrioritizedMessages(boolean prioritizedMessages) { 474 super.setPrioritizedMessages(prioritizedMessages); 475 } 476 477 @Override 478 public String toString() { 479 return destination.getPhysicalName() + ",pendingSize:" + pendingAdditions.size(); 480 } 481}