001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.CancellationException; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.CopyOnWriteArrayList; 028import java.util.concurrent.Future; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030 031import org.apache.activemq.advisory.AdvisorySupport; 032import org.apache.activemq.broker.BrokerService; 033import org.apache.activemq.broker.ConnectionContext; 034import org.apache.activemq.broker.ProducerBrokerExchange; 035import org.apache.activemq.broker.region.policy.DispatchPolicy; 036import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; 037import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; 038import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; 039import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; 040import org.apache.activemq.broker.util.InsertionCountList; 041import org.apache.activemq.command.ActiveMQDestination; 042import org.apache.activemq.command.ExceptionResponse; 043import org.apache.activemq.command.Message; 044import org.apache.activemq.command.MessageAck; 045import org.apache.activemq.command.MessageId; 046import org.apache.activemq.command.ProducerAck; 047import org.apache.activemq.command.ProducerInfo; 048import org.apache.activemq.command.Response; 049import org.apache.activemq.command.SubscriptionInfo; 050import org.apache.activemq.filter.MessageEvaluationContext; 051import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 052import org.apache.activemq.store.MessageRecoveryListener; 053import org.apache.activemq.store.TopicMessageStore; 054import org.apache.activemq.thread.Task; 055import org.apache.activemq.thread.TaskRunner; 056import org.apache.activemq.thread.TaskRunnerFactory; 057import org.apache.activemq.transaction.Synchronization; 058import org.apache.activemq.util.SubscriptionKey; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * The Topic is a destination that sends a copy of a message to every active 064 * Subscription registered. 065 */ 066public class Topic extends BaseDestination implements Task { 067 protected static final Logger LOG = LoggerFactory.getLogger(Topic.class); 068 private final TopicMessageStore topicStore; 069 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); 070 private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock(); 071 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); 072 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 073 private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 074 private final TaskRunner taskRunner; 075 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); 076 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 077 @Override 078 public void run() { 079 try { 080 Topic.this.taskRunner.wakeup(); 081 } catch (InterruptedException e) { 082 } 083 } 084 }; 085 086 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, 087 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 088 super(brokerService, store, destination, parentStats); 089 this.topicStore = store; 090 subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null); 091 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); 092 } 093 094 @Override 095 public void initialize() throws Exception { 096 super.initialize(); 097 // set non default subscription recovery policy (override policyEntries) 098 if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) { 099 subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); 100 setAlwaysRetroactive(true); 101 } 102 if (store != null) { 103 // AMQ-2586: Better to leave this stat at zero than to give the user 104 // misleading metrics. 105 // int messageCount = store.getMessageCount(); 106 // destinationStatistics.getMessages().setCount(messageCount); 107 } 108 } 109 110 @Override 111 public List<Subscription> getConsumers() { 112 synchronized (consumers) { 113 return new ArrayList<Subscription>(consumers); 114 } 115 } 116 117 public boolean lock(MessageReference node, LockOwner sub) { 118 return true; 119 } 120 121 @Override 122 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { 123 if (!sub.getConsumerInfo().isDurable()) { 124 125 // Do a retroactive recovery if needed. 126 if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) { 127 128 // synchronize with dispatch method so that no new messages are sent 129 // while we are recovering a subscription to avoid out of order messages. 130 dispatchLock.writeLock().lock(); 131 try { 132 boolean applyRecovery = false; 133 synchronized (consumers) { 134 if (!consumers.contains(sub)){ 135 sub.add(context, this); 136 consumers.add(sub); 137 applyRecovery=true; 138 super.addSubscription(context, sub); 139 } 140 } 141 if (applyRecovery){ 142 subscriptionRecoveryPolicy.recover(context, this, sub); 143 } 144 } finally { 145 dispatchLock.writeLock().unlock(); 146 } 147 148 } else { 149 synchronized (consumers) { 150 if (!consumers.contains(sub)){ 151 sub.add(context, this); 152 consumers.add(sub); 153 super.addSubscription(context, sub); 154 } 155 } 156 } 157 } else { 158 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 159 super.addSubscription(context, sub); 160 sub.add(context, this); 161 if(dsub.isActive()) { 162 synchronized (consumers) { 163 boolean hasSubscription = false; 164 165 if (consumers.size() == 0) { 166 hasSubscription = false; 167 } else { 168 for (Subscription currentSub : consumers) { 169 if (currentSub.getConsumerInfo().isDurable()) { 170 DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub; 171 if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) { 172 hasSubscription = true; 173 break; 174 } 175 } 176 } 177 } 178 179 if (!hasSubscription) { 180 consumers.add(sub); 181 } 182 } 183 } 184 durableSubscribers.put(dsub.getSubscriptionKey(), dsub); 185 } 186 } 187 188 @Override 189 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { 190 if (!sub.getConsumerInfo().isDurable()) { 191 boolean removed = false; 192 synchronized (consumers) { 193 removed = consumers.remove(sub); 194 } 195 if (removed) { 196 super.removeSubscription(context, sub, lastDeliveredSequenceId); 197 } 198 } 199 sub.remove(context, this); 200 } 201 202 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { 203 if (topicStore != null) { 204 topicStore.deleteSubscription(key.clientId, key.subscriptionName); 205 DurableTopicSubscription removed = durableSubscribers.remove(key); 206 if (removed != null) { 207 destinationStatistics.getConsumers().decrement(); 208 // deactivate and remove 209 removed.deactivate(false, 0l); 210 consumers.remove(removed); 211 } 212 } 213 } 214 215 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { 216 // synchronize with dispatch method so that no new messages are sent 217 // while we are recovering a subscription to avoid out of order messages. 218 dispatchLock.writeLock().lock(); 219 try { 220 221 if (topicStore == null) { 222 return; 223 } 224 225 // Recover the durable subscription. 226 String clientId = subscription.getSubscriptionKey().getClientId(); 227 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName(); 228 String selector = subscription.getConsumerInfo().getSelector(); 229 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName); 230 if (info != null) { 231 // Check to see if selector changed. 232 String s1 = info.getSelector(); 233 if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) { 234 // Need to delete the subscription 235 topicStore.deleteSubscription(clientId, subscriptionName); 236 info = null; 237 synchronized (consumers) { 238 consumers.remove(subscription); 239 } 240 } else { 241 synchronized (consumers) { 242 if (!consumers.contains(subscription)) { 243 consumers.add(subscription); 244 } 245 } 246 } 247 } 248 249 // Do we need to create the subscription? 250 if (info == null) { 251 info = new SubscriptionInfo(); 252 info.setClientId(clientId); 253 info.setSelector(selector); 254 info.setSubscriptionName(subscriptionName); 255 info.setDestination(getActiveMQDestination()); 256 // This destination is an actual destination id. 257 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); 258 // This destination might be a pattern 259 synchronized (consumers) { 260 consumers.add(subscription); 261 topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive()); 262 } 263 } 264 265 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 266 msgContext.setDestination(destination); 267 if (subscription.isRecoveryRequired()) { 268 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { 269 @Override 270 public boolean recoverMessage(Message message) throws Exception { 271 message.setRegionDestination(Topic.this); 272 try { 273 msgContext.setMessageReference(message); 274 if (subscription.matches(message, msgContext)) { 275 subscription.add(message); 276 } 277 } catch (IOException e) { 278 LOG.error("Failed to recover this message {}", message, e); 279 } 280 return true; 281 } 282 283 @Override 284 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 285 throw new RuntimeException("Should not be called."); 286 } 287 288 @Override 289 public boolean hasSpace() { 290 return true; 291 } 292 293 @Override 294 public boolean isDuplicate(MessageId id) { 295 return false; 296 } 297 }); 298 } 299 } finally { 300 dispatchLock.writeLock().unlock(); 301 } 302 } 303 304 public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception { 305 synchronized (consumers) { 306 consumers.remove(sub); 307 } 308 sub.remove(context, this, dispatched); 309 } 310 311 public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { 312 if (subscription.getConsumerInfo().isRetroactive()) { 313 subscriptionRecoveryPolicy.recover(context, this, subscription); 314 } 315 } 316 317 @Override 318 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 319 final ConnectionContext context = producerExchange.getConnectionContext(); 320 321 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 322 producerExchange.incrementSend(); 323 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 324 && !context.isInRecoveryMode(); 325 326 message.setRegionDestination(this); 327 328 // There is delay between the client sending it and it arriving at the 329 // destination.. it may have expired. 330 if (message.isExpired()) { 331 broker.messageExpired(context, message, null); 332 getDestinationStatistics().getExpired().increment(); 333 if (sendProducerAck) { 334 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 335 context.getConnection().dispatchAsync(ack); 336 } 337 return; 338 } 339 340 if (memoryUsage.isFull()) { 341 isFull(context, memoryUsage); 342 fastProducer(context, producerInfo); 343 344 if (isProducerFlowControl() && context.isProducerFlowControl()) { 345 346 if (isFlowControlLogRequired()) { 347 LOG.warn("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 348 getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); 349 } else { 350 LOG.debug("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 351 getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); 352 } 353 354 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 355 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit (" 356 + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId() 357 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 358 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 359 } 360 361 // We can avoid blocking due to low usage if the producer is sending a sync message or 362 // if it is using a producer window 363 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 364 synchronized (messagesWaitingForSpace) { 365 messagesWaitingForSpace.add(new Runnable() { 366 @Override 367 public void run() { 368 try { 369 370 // While waiting for space to free up... the 371 // message may have expired. 372 if (message.isExpired()) { 373 broker.messageExpired(context, message, null); 374 getDestinationStatistics().getExpired().increment(); 375 } else { 376 doMessageSend(producerExchange, message); 377 } 378 379 if (sendProducerAck) { 380 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 381 .getSize()); 382 context.getConnection().dispatchAsync(ack); 383 } else { 384 Response response = new Response(); 385 response.setCorrelationId(message.getCommandId()); 386 context.getConnection().dispatchAsync(response); 387 } 388 389 } catch (Exception e) { 390 if (!sendProducerAck && !context.isInRecoveryMode()) { 391 ExceptionResponse response = new ExceptionResponse(e); 392 response.setCorrelationId(message.getCommandId()); 393 context.getConnection().dispatchAsync(response); 394 } 395 } 396 } 397 }); 398 399 registerCallbackForNotFullNotification(); 400 context.setDontSendReponse(true); 401 return; 402 } 403 404 } else { 405 // Producer flow control cannot be used, so we have do the flow control 406 // at the broker by blocking this thread until there is space available. 407 408 if (memoryUsage.isFull()) { 409 if (context.isInTransaction()) { 410 411 int count = 0; 412 while (!memoryUsage.waitForSpace(1000)) { 413 if (context.getStopping().get()) { 414 throw new IOException("Connection closed, send aborted."); 415 } 416 if (count > 2 && context.isInTransaction()) { 417 count = 0; 418 int size = context.getTransaction().size(); 419 LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message); 420 } 421 count++; 422 } 423 } else { 424 waitForSpace( 425 context, 426 producerExchange, 427 memoryUsage, 428 "Usage Manager Memory Usage limit reached. Stopping producer (" 429 + message.getProducerId() 430 + ") to prevent flooding " 431 + getActiveMQDestination().getQualifiedName() 432 + "." 433 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 434 } 435 } 436 437 // The usage manager could have delayed us by the time 438 // we unblock the message could have expired.. 439 if (message.isExpired()) { 440 getDestinationStatistics().getExpired().increment(); 441 LOG.debug("Expired message: {}", message); 442 return; 443 } 444 } 445 } 446 } 447 448 doMessageSend(producerExchange, message); 449 messageDelivered(context, message); 450 if (sendProducerAck) { 451 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 452 context.getConnection().dispatchAsync(ack); 453 } 454 } 455 456 /** 457 * do send the message - this needs to be synchronized to ensure messages 458 * are stored AND dispatched in the right order 459 * 460 * @param producerExchange 461 * @param message 462 * @throws IOException 463 * @throws Exception 464 */ 465 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) 466 throws IOException, Exception { 467 final ConnectionContext context = producerExchange.getConnectionContext(); 468 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 469 Future<Object> result = null; 470 471 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { 472 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 473 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of " 474 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() 475 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 476 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 477 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 478 throw new javax.jms.ResourceAllocationException(logMessage); 479 } 480 481 waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 482 } 483 result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage()); 484 } 485 486 message.incrementReferenceCount(); 487 488 if (context.isInTransaction()) { 489 context.getTransaction().addSynchronization(new Synchronization() { 490 @Override 491 public void afterCommit() throws Exception { 492 // It could take while before we receive the commit 493 // operation.. by that time the message could have 494 // expired.. 495 if (message.isExpired()) { 496 if (broker.isExpired(message)) { 497 getDestinationStatistics().getExpired().increment(); 498 broker.messageExpired(context, message, null); 499 } 500 message.decrementReferenceCount(); 501 return; 502 } 503 try { 504 dispatch(context, message); 505 } finally { 506 message.decrementReferenceCount(); 507 } 508 } 509 510 @Override 511 public void afterRollback() throws Exception { 512 message.decrementReferenceCount(); 513 } 514 }); 515 516 } else { 517 try { 518 dispatch(context, message); 519 } finally { 520 message.decrementReferenceCount(); 521 } 522 } 523 524 if (result != null && !result.isCancelled()) { 525 try { 526 result.get(); 527 } catch (CancellationException e) { 528 // ignore - the task has been cancelled if the message 529 // has already been deleted 530 } 531 } 532 } 533 534 private boolean canOptimizeOutPersistence() { 535 return durableSubscribers.size() == 0; 536 } 537 538 @Override 539 public String toString() { 540 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); 541 } 542 543 @Override 544 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, 545 final MessageReference node) throws IOException { 546 if (topicStore != null && node.isPersistent()) { 547 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 548 SubscriptionKey key = dsub.getSubscriptionKey(); 549 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), 550 convertToNonRangedAck(ack, node)); 551 } 552 messageConsumed(context, node); 553 } 554 555 @Override 556 public void gc() { 557 } 558 559 public Message loadMessage(MessageId messageId) throws IOException { 560 return topicStore != null ? topicStore.getMessage(messageId) : null; 561 } 562 563 @Override 564 public void start() throws Exception { 565 if (started.compareAndSet(false, true)) { 566 this.subscriptionRecoveryPolicy.start(); 567 if (memoryUsage != null) { 568 memoryUsage.start(); 569 } 570 571 if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) { 572 scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); 573 } 574 } 575 } 576 577 @Override 578 public void stop() throws Exception { 579 if (started.compareAndSet(true, false)) { 580 if (taskRunner != null) { 581 taskRunner.shutdown(); 582 } 583 this.subscriptionRecoveryPolicy.stop(); 584 if (memoryUsage != null) { 585 memoryUsage.stop(); 586 } 587 if (this.topicStore != null) { 588 this.topicStore.stop(); 589 } 590 591 scheduler.cancel(expireMessagesTask); 592 } 593 } 594 595 @Override 596 public Message[] browse() { 597 final List<Message> result = new ArrayList<Message>(); 598 doBrowse(result, getMaxBrowsePageSize()); 599 return result.toArray(new Message[result.size()]); 600 } 601 602 private void doBrowse(final List<Message> browseList, final int max) { 603 try { 604 if (topicStore != null) { 605 final List<Message> toExpire = new ArrayList<Message>(); 606 topicStore.recover(new MessageRecoveryListener() { 607 @Override 608 public boolean recoverMessage(Message message) throws Exception { 609 if (message.isExpired()) { 610 toExpire.add(message); 611 } 612 browseList.add(message); 613 return true; 614 } 615 616 @Override 617 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 618 return true; 619 } 620 621 @Override 622 public boolean hasSpace() { 623 return browseList.size() < max; 624 } 625 626 @Override 627 public boolean isDuplicate(MessageId id) { 628 return false; 629 } 630 }); 631 final ConnectionContext connectionContext = createConnectionContext(); 632 for (Message message : toExpire) { 633 for (DurableTopicSubscription sub : durableSubscribers.values()) { 634 if (!sub.isActive()) { 635 message.setRegionDestination(this); 636 messageExpired(connectionContext, sub, message); 637 } 638 } 639 } 640 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination()); 641 if (msgs != null) { 642 for (int i = 0; i < msgs.length && browseList.size() < max; i++) { 643 browseList.add(msgs[i]); 644 } 645 } 646 } 647 } catch (Throwable e) { 648 LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e); 649 } 650 } 651 652 @Override 653 public boolean iterate() { 654 synchronized (messagesWaitingForSpace) { 655 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { 656 Runnable op = messagesWaitingForSpace.removeFirst(); 657 op.run(); 658 } 659 660 if (!messagesWaitingForSpace.isEmpty()) { 661 registerCallbackForNotFullNotification(); 662 } 663 } 664 return false; 665 } 666 667 private void registerCallbackForNotFullNotification() { 668 // If the usage manager is not full, then the task will not 669 // get called.. 670 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 671 // so call it directly here. 672 sendMessagesWaitingForSpaceTask.run(); 673 } 674 } 675 676 // Properties 677 // ------------------------------------------------------------------------- 678 679 public DispatchPolicy getDispatchPolicy() { 680 return dispatchPolicy; 681 } 682 683 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 684 this.dispatchPolicy = dispatchPolicy; 685 } 686 687 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 688 return subscriptionRecoveryPolicy; 689 } 690 691 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) { 692 if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) { 693 // allow users to combine retained message policy with other ActiveMQ policies 694 RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy; 695 policy.setWrapped(recoveryPolicy); 696 } else { 697 this.subscriptionRecoveryPolicy = recoveryPolicy; 698 } 699 } 700 701 // Implementation methods 702 // ------------------------------------------------------------------------- 703 704 @Override 705 public final void wakeup() { 706 } 707 708 protected void dispatch(final ConnectionContext context, Message message) throws Exception { 709 // AMQ-2586: Better to leave this stat at zero than to give the user 710 // misleading metrics. 711 // destinationStatistics.getMessages().increment(); 712 destinationStatistics.getEnqueues().increment(); 713 destinationStatistics.getMessageSize().addSize(message.getSize()); 714 MessageEvaluationContext msgContext = null; 715 716 dispatchLock.readLock().lock(); 717 try { 718 if (!subscriptionRecoveryPolicy.add(context, message)) { 719 return; 720 } 721 synchronized (consumers) { 722 if (consumers.isEmpty()) { 723 onMessageWithNoConsumers(context, message); 724 return; 725 } 726 } 727 msgContext = context.getMessageEvaluationContext(); 728 msgContext.setDestination(destination); 729 msgContext.setMessageReference(message); 730 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { 731 onMessageWithNoConsumers(context, message); 732 } 733 734 } finally { 735 dispatchLock.readLock().unlock(); 736 if (msgContext != null) { 737 msgContext.clear(); 738 } 739 } 740 } 741 742 private final Runnable expireMessagesTask = new Runnable() { 743 @Override 744 public void run() { 745 List<Message> browsedMessages = new InsertionCountList<Message>(); 746 doBrowse(browsedMessages, getMaxExpirePageSize()); 747 } 748 }; 749 750 @Override 751 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 752 broker.messageExpired(context, reference, subs); 753 // AMQ-2586: Better to leave this stat at zero than to give the user 754 // misleading metrics. 755 // destinationStatistics.getMessages().decrement(); 756 destinationStatistics.getExpired().increment(); 757 MessageAck ack = new MessageAck(); 758 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 759 ack.setDestination(destination); 760 ack.setMessageID(reference.getMessageId()); 761 try { 762 if (subs instanceof DurableTopicSubscription) { 763 ((DurableTopicSubscription)subs).removePending(reference); 764 } 765 acknowledge(context, subs, ack, reference); 766 } catch (Exception e) { 767 LOG.error("Failed to remove expired Message from the store ", e); 768 } 769 } 770 771 @Override 772 protected Logger getLog() { 773 return LOG; 774 } 775 776 protected boolean isOptimizeStorage(){ 777 boolean result = false; 778 779 if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){ 780 result = true; 781 for (DurableTopicSubscription s : durableSubscribers.values()) { 782 if (s.isActive()== false){ 783 result = false; 784 break; 785 } 786 if (s.getPrefetchSize()==0){ 787 result = false; 788 break; 789 } 790 if (s.isSlowConsumer()){ 791 result = false; 792 break; 793 } 794 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){ 795 result = false; 796 break; 797 } 798 } 799 } 800 return result; 801 } 802 803 /** 804 * force a reread of the store - after transaction recovery completion 805 * @param pendingAdditionsCount 806 */ 807 @Override 808 public void clearPendingMessages(int pendingAdditionsCount) { 809 dispatchLock.readLock().lock(); 810 try { 811 for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) { 812 clearPendingAndDispatch(durableTopicSubscription); 813 } 814 } finally { 815 dispatchLock.readLock().unlock(); 816 } 817 } 818 819 private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) { 820 synchronized (durableTopicSubscription.pendingLock) { 821 durableTopicSubscription.pending.clear(); 822 try { 823 durableTopicSubscription.dispatchPending(); 824 } catch (IOException exception) { 825 LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{ 826 durableTopicSubscription, 827 destination, 828 durableTopicSubscription.pending }, exception); 829 } 830 } 831 } 832 833 private void rollback(MessageId poisoned) { 834 dispatchLock.readLock().lock(); 835 try { 836 for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) { 837 durableTopicSubscription.getPending().rollback(poisoned); 838 } 839 } finally { 840 dispatchLock.readLock().unlock(); 841 } 842 } 843 844 public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() { 845 return durableSubscribers; 846 } 847}