001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.CancellationException; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.CopyOnWriteArrayList; 028import java.util.concurrent.Future; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.locks.ReentrantReadWriteLock; 031 032import org.apache.activemq.advisory.AdvisorySupport; 033import org.apache.activemq.broker.BrokerService; 034import org.apache.activemq.broker.ConnectionContext; 035import org.apache.activemq.broker.ProducerBrokerExchange; 036import org.apache.activemq.broker.region.policy.DispatchPolicy; 037import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; 038import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; 039import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; 040import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; 041import org.apache.activemq.broker.util.InsertionCountList; 042import org.apache.activemq.command.ActiveMQDestination; 043import org.apache.activemq.command.ExceptionResponse; 044import org.apache.activemq.command.Message; 045import org.apache.activemq.command.MessageAck; 046import org.apache.activemq.command.MessageId; 047import org.apache.activemq.command.ProducerAck; 048import org.apache.activemq.command.ProducerInfo; 049import org.apache.activemq.command.Response; 050import org.apache.activemq.command.SubscriptionInfo; 051import org.apache.activemq.filter.MessageEvaluationContext; 052import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 053import org.apache.activemq.store.MessageRecoveryListener; 054import org.apache.activemq.store.TopicMessageStore; 055import org.apache.activemq.thread.Task; 056import org.apache.activemq.thread.TaskRunner; 057import org.apache.activemq.thread.TaskRunnerFactory; 058import org.apache.activemq.transaction.Synchronization; 059import org.apache.activemq.util.SubscriptionKey; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import javax.jms.JMSException; 064 065import static org.apache.activemq.transaction.Transaction.IN_USE_STATE; 066 067/** 068 * The Topic is a destination that sends a copy of a message to every active 069 * Subscription registered. 070 */ 071public class Topic extends BaseDestination implements Task { 072 protected static final Logger LOG = LoggerFactory.getLogger(Topic.class); 073 private final TopicMessageStore topicStore; 074 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); 075 private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock(); 076 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); 077 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 078 private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 079 private final TaskRunner taskRunner; 080 private final TaskRunnerFactory taskRunnerFactor; 081 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); 082 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 083 @Override 084 public void run() { 085 try { 086 Topic.this.taskRunner.wakeup(); 087 } catch (InterruptedException e) { 088 } 089 } 090 }; 091 092 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, 093 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 094 super(brokerService, store, destination, parentStats); 095 this.topicStore = store; 096 subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null); 097 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); 098 this.taskRunnerFactor = taskFactory; 099 } 100 101 @Override 102 public void initialize() throws Exception { 103 super.initialize(); 104 // set non default subscription recovery policy (override policyEntries) 105 if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) { 106 subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); 107 setAlwaysRetroactive(true); 108 } 109 if (store != null) { 110 // AMQ-2586: Better to leave this stat at zero than to give the user 111 // misleading metrics. 112 // int messageCount = store.getMessageCount(); 113 // destinationStatistics.getMessages().setCount(messageCount); 114 } 115 } 116 117 @Override 118 public List<Subscription> getConsumers() { 119 synchronized (consumers) { 120 return new ArrayList<Subscription>(consumers); 121 } 122 } 123 124 public boolean lock(MessageReference node, LockOwner sub) { 125 return true; 126 } 127 128 @Override 129 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { 130 if (!sub.getConsumerInfo().isDurable()) { 131 132 // Do a retroactive recovery if needed. 133 if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) { 134 135 // synchronize with dispatch method so that no new messages are sent 136 // while we are recovering a subscription to avoid out of order messages. 137 dispatchLock.writeLock().lock(); 138 try { 139 boolean applyRecovery = false; 140 synchronized (consumers) { 141 if (!consumers.contains(sub)){ 142 sub.add(context, this); 143 consumers.add(sub); 144 applyRecovery=true; 145 super.addSubscription(context, sub); 146 } 147 } 148 if (applyRecovery){ 149 subscriptionRecoveryPolicy.recover(context, this, sub); 150 } 151 } finally { 152 dispatchLock.writeLock().unlock(); 153 } 154 155 } else { 156 synchronized (consumers) { 157 if (!consumers.contains(sub)){ 158 sub.add(context, this); 159 consumers.add(sub); 160 super.addSubscription(context, sub); 161 } 162 } 163 } 164 } else { 165 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 166 super.addSubscription(context, sub); 167 sub.add(context, this); 168 if(dsub.isActive()) { 169 synchronized (consumers) { 170 boolean hasSubscription = false; 171 172 if (consumers.size() == 0) { 173 hasSubscription = false; 174 } else { 175 for (Subscription currentSub : consumers) { 176 if (currentSub.getConsumerInfo().isDurable()) { 177 DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub; 178 if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) { 179 hasSubscription = true; 180 break; 181 } 182 } 183 } 184 } 185 186 if (!hasSubscription) { 187 consumers.add(sub); 188 } 189 } 190 } 191 durableSubscribers.put(dsub.getSubscriptionKey(), dsub); 192 } 193 } 194 195 @Override 196 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { 197 if (!sub.getConsumerInfo().isDurable()) { 198 boolean removed = false; 199 synchronized (consumers) { 200 removed = consumers.remove(sub); 201 } 202 if (removed) { 203 super.removeSubscription(context, sub, lastDeliveredSequenceId); 204 } 205 } 206 sub.remove(context, this); 207 } 208 209 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { 210 if (topicStore != null) { 211 topicStore.deleteSubscription(key.clientId, key.subscriptionName); 212 DurableTopicSubscription removed = durableSubscribers.remove(key); 213 if (removed != null) { 214 destinationStatistics.getConsumers().decrement(); 215 // deactivate and remove 216 removed.deactivate(false, 0l); 217 consumers.remove(removed); 218 } 219 } 220 } 221 222 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { 223 // synchronize with dispatch method so that no new messages are sent 224 // while we are recovering a subscription to avoid out of order messages. 225 dispatchLock.writeLock().lock(); 226 try { 227 228 if (topicStore == null) { 229 return; 230 } 231 232 // Recover the durable subscription. 233 String clientId = subscription.getSubscriptionKey().getClientId(); 234 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName(); 235 String selector = subscription.getConsumerInfo().getSelector(); 236 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName); 237 if (info != null) { 238 // Check to see if selector changed. 239 String s1 = info.getSelector(); 240 if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) { 241 // Need to delete the subscription 242 topicStore.deleteSubscription(clientId, subscriptionName); 243 info = null; 244 synchronized (consumers) { 245 consumers.remove(subscription); 246 } 247 } else { 248 synchronized (consumers) { 249 if (!consumers.contains(subscription)) { 250 consumers.add(subscription); 251 } 252 } 253 } 254 } 255 256 // Do we need to create the subscription? 257 if (info == null) { 258 info = new SubscriptionInfo(); 259 info.setClientId(clientId); 260 info.setSelector(selector); 261 info.setSubscriptionName(subscriptionName); 262 info.setDestination(getActiveMQDestination()); 263 // This destination is an actual destination id. 264 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); 265 // This destination might be a pattern 266 synchronized (consumers) { 267 consumers.add(subscription); 268 topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive()); 269 } 270 } 271 272 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 273 msgContext.setDestination(destination); 274 if (subscription.isRecoveryRequired()) { 275 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { 276 @Override 277 public boolean recoverMessage(Message message) throws Exception { 278 message.setRegionDestination(Topic.this); 279 try { 280 msgContext.setMessageReference(message); 281 if (subscription.matches(message, msgContext)) { 282 subscription.add(message); 283 } 284 } catch (IOException e) { 285 LOG.error("Failed to recover this message {}", message, e); 286 } 287 return true; 288 } 289 290 @Override 291 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 292 throw new RuntimeException("Should not be called."); 293 } 294 295 @Override 296 public boolean hasSpace() { 297 return true; 298 } 299 300 @Override 301 public boolean isDuplicate(MessageId id) { 302 return false; 303 } 304 305 306 @Override 307 public boolean canRecoveryNextMessage() { 308 return true; 309 } 310 }); 311 } 312 } finally { 313 dispatchLock.writeLock().unlock(); 314 } 315 } 316 317 public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception { 318 synchronized (consumers) { 319 consumers.remove(sub); 320 } 321 sub.remove(context, this, dispatched); 322 } 323 324 public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { 325 if (subscription.getConsumerInfo().isRetroactive()) { 326 subscriptionRecoveryPolicy.recover(context, this, subscription); 327 } 328 } 329 330 @Override 331 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 332 final ConnectionContext context = producerExchange.getConnectionContext(); 333 334 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 335 producerExchange.incrementSend(); 336 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 337 && !context.isInRecoveryMode(); 338 339 message.setRegionDestination(this); 340 341 // There is delay between the client sending it and it arriving at the 342 // destination.. it may have expired. 343 if (message.isExpired()) { 344 broker.messageExpired(context, message, null); 345 getDestinationStatistics().getExpired().increment(); 346 if (sendProducerAck) { 347 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 348 context.getConnection().dispatchAsync(ack); 349 } 350 return; 351 } 352 353 if (memoryUsage.isFull()) { 354 isFull(context, memoryUsage); 355 fastProducer(context, producerInfo); 356 357 if (isProducerFlowControl() && context.isProducerFlowControl()) { 358 359 if (isFlowControlLogRequired()) { 360 LOG.warn("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 361 getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); 362 } else { 363 LOG.debug("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 364 getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); 365 } 366 367 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 368 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit (" 369 + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId() 370 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 371 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 372 } 373 374 // We can avoid blocking due to low usage if the producer is sending a sync message or 375 // if it is using a producer window 376 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 377 synchronized (messagesWaitingForSpace) { 378 messagesWaitingForSpace.add(new Runnable() { 379 @Override 380 public void run() { 381 try { 382 383 // While waiting for space to free up... 384 // the transaction may be done 385 if (message.isInTransaction()) { 386 if (context.getTransaction() == null || context.getTransaction().getState() > IN_USE_STATE) { 387 throw new JMSException("Send transaction completed while waiting for space"); 388 } 389 } 390 391 // the message may have expired. 392 if (message.isExpired()) { 393 broker.messageExpired(context, message, null); 394 getDestinationStatistics().getExpired().increment(); 395 } else { 396 doMessageSend(producerExchange, message); 397 } 398 399 if (sendProducerAck) { 400 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 401 .getSize()); 402 context.getConnection().dispatchAsync(ack); 403 } else { 404 Response response = new Response(); 405 response.setCorrelationId(message.getCommandId()); 406 context.getConnection().dispatchAsync(response); 407 } 408 409 } catch (Exception e) { 410 if (!sendProducerAck && !context.isInRecoveryMode()) { 411 ExceptionResponse response = new ExceptionResponse(e); 412 response.setCorrelationId(message.getCommandId()); 413 context.getConnection().dispatchAsync(response); 414 } 415 } 416 } 417 }); 418 419 registerCallbackForNotFullNotification(); 420 context.setDontSendReponse(true); 421 return; 422 } 423 424 } else { 425 // Producer flow control cannot be used, so we have do the flow control 426 // at the broker by blocking this thread until there is space available. 427 428 if (memoryUsage.isFull()) { 429 if (context.isInTransaction()) { 430 431 int count = 0; 432 while (!memoryUsage.waitForSpace(1000)) { 433 if (context.getStopping().get()) { 434 throw new IOException("Connection closed, send aborted."); 435 } 436 if (count > 2 && context.isInTransaction()) { 437 count = 0; 438 int size = context.getTransaction().size(); 439 LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message); 440 } 441 count++; 442 } 443 } else { 444 waitForSpace( 445 context, 446 producerExchange, 447 memoryUsage, 448 "Usage Manager Memory Usage limit reached. Stopping producer (" 449 + message.getProducerId() 450 + ") to prevent flooding " 451 + getActiveMQDestination().getQualifiedName() 452 + "." 453 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 454 } 455 } 456 457 // The usage manager could have delayed us by the time 458 // we unblock the message could have expired.. 459 if (message.isExpired()) { 460 getDestinationStatistics().getExpired().increment(); 461 LOG.debug("Expired message: {}", message); 462 return; 463 } 464 } 465 } 466 } 467 468 doMessageSend(producerExchange, message); 469 messageDelivered(context, message); 470 if (sendProducerAck) { 471 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 472 context.getConnection().dispatchAsync(ack); 473 } 474 } 475 476 /** 477 * do send the message - this needs to be synchronized to ensure messages 478 * are stored AND dispatched in the right order 479 * 480 * @param producerExchange 481 * @param message 482 * @throws IOException 483 * @throws Exception 484 */ 485 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) 486 throws IOException, Exception { 487 final ConnectionContext context = producerExchange.getConnectionContext(); 488 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 489 Future<Object> result = null; 490 491 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { 492 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 493 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of " 494 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() 495 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 496 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 497 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 498 throw new javax.jms.ResourceAllocationException(logMessage); 499 } 500 501 waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 502 } 503 result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage()); 504 } 505 506 message.incrementReferenceCount(); 507 508 if (context.isInTransaction()) { 509 context.getTransaction().addSynchronization(new Synchronization() { 510 @Override 511 public void afterCommit() throws Exception { 512 // It could take while before we receive the commit 513 // operation.. by that time the message could have 514 // expired.. 515 if (message.isExpired()) { 516 if (broker.isExpired(message)) { 517 getDestinationStatistics().getExpired().increment(); 518 broker.messageExpired(context, message, null); 519 } 520 message.decrementReferenceCount(); 521 return; 522 } 523 try { 524 dispatch(context, message); 525 } finally { 526 message.decrementReferenceCount(); 527 } 528 } 529 530 @Override 531 public void afterRollback() throws Exception { 532 message.decrementReferenceCount(); 533 } 534 }); 535 536 } else { 537 try { 538 dispatch(context, message); 539 } finally { 540 message.decrementReferenceCount(); 541 } 542 } 543 544 if (result != null && !result.isCancelled()) { 545 try { 546 result.get(); 547 } catch (CancellationException e) { 548 // ignore - the task has been cancelled if the message 549 // has already been deleted 550 } 551 } 552 } 553 554 private boolean canOptimizeOutPersistence() { 555 return durableSubscribers.size() == 0; 556 } 557 558 @Override 559 public String toString() { 560 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); 561 } 562 563 @Override 564 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, 565 final MessageReference node) throws IOException { 566 if (topicStore != null && node.isPersistent()) { 567 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 568 SubscriptionKey key = dsub.getSubscriptionKey(); 569 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), 570 convertToNonRangedAck(ack, node)); 571 } 572 messageConsumed(context, node); 573 } 574 575 @Override 576 public void gc() { 577 } 578 579 public Message loadMessage(MessageId messageId) throws IOException { 580 return topicStore != null ? topicStore.getMessage(messageId) : null; 581 } 582 583 @Override 584 public void start() throws Exception { 585 if (started.compareAndSet(false, true)) { 586 this.subscriptionRecoveryPolicy.start(); 587 if (memoryUsage != null) { 588 memoryUsage.start(); 589 } 590 591 if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) { 592 scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); 593 } 594 } 595 } 596 597 @Override 598 public void stop() throws Exception { 599 if (started.compareAndSet(true, false)) { 600 if (taskRunner != null) { 601 taskRunner.shutdown(); 602 } 603 this.subscriptionRecoveryPolicy.stop(); 604 if (memoryUsage != null) { 605 memoryUsage.stop(); 606 } 607 if (this.topicStore != null) { 608 this.topicStore.stop(); 609 } 610 611 scheduler.cancel(expireMessagesTask); 612 } 613 } 614 615 @Override 616 public Message[] browse() { 617 final List<Message> result = new ArrayList<Message>(); 618 doBrowse(result, getMaxBrowsePageSize()); 619 return result.toArray(new Message[result.size()]); 620 } 621 622 private void doBrowse(final List<Message> browseList, final int max) { 623 try { 624 if (topicStore != null) { 625 final List<Message> toExpire = new ArrayList<Message>(); 626 topicStore.recover(new MessageRecoveryListener() { 627 @Override 628 public boolean recoverMessage(Message message) throws Exception { 629 if (message.isExpired()) { 630 toExpire.add(message); 631 } 632 browseList.add(message); 633 return true; 634 } 635 636 @Override 637 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 638 return true; 639 } 640 641 @Override 642 public boolean hasSpace() { 643 return browseList.size() < max; 644 } 645 646 @Override 647 public boolean isDuplicate(MessageId id) { 648 return false; 649 } 650 651 @Override 652 public boolean canRecoveryNextMessage() { 653 return true; 654 } 655 }); 656 final ConnectionContext connectionContext = createConnectionContext(); 657 for (Message message : toExpire) { 658 for (DurableTopicSubscription sub : durableSubscribers.values()) { 659 if (!sub.isActive()) { 660 message.setRegionDestination(this); 661 messageExpired(connectionContext, sub, message); 662 } 663 } 664 } 665 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination()); 666 if (msgs != null) { 667 for (int i = 0; i < msgs.length && browseList.size() < max; i++) { 668 browseList.add(msgs[i]); 669 } 670 } 671 } 672 } catch (Throwable e) { 673 LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e); 674 } 675 } 676 677 @Override 678 public boolean iterate() { 679 synchronized (messagesWaitingForSpace) { 680 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { 681 Runnable op = messagesWaitingForSpace.removeFirst(); 682 op.run(); 683 } 684 685 if (!messagesWaitingForSpace.isEmpty()) { 686 registerCallbackForNotFullNotification(); 687 } 688 } 689 return false; 690 } 691 692 private void registerCallbackForNotFullNotification() { 693 // If the usage manager is not full, then the task will not 694 // get called.. 695 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 696 // so call it directly here. 697 sendMessagesWaitingForSpaceTask.run(); 698 } 699 } 700 701 // Properties 702 // ------------------------------------------------------------------------- 703 704 public DispatchPolicy getDispatchPolicy() { 705 return dispatchPolicy; 706 } 707 708 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 709 this.dispatchPolicy = dispatchPolicy; 710 } 711 712 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 713 return subscriptionRecoveryPolicy; 714 } 715 716 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) { 717 if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) { 718 // allow users to combine retained message policy with other ActiveMQ policies 719 RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy; 720 policy.setWrapped(recoveryPolicy); 721 } else { 722 this.subscriptionRecoveryPolicy = recoveryPolicy; 723 } 724 } 725 726 // Implementation methods 727 // ------------------------------------------------------------------------- 728 729 @Override 730 public final void wakeup() { 731 } 732 733 protected void dispatch(final ConnectionContext context, Message message) throws Exception { 734 // AMQ-2586: Better to leave this stat at zero than to give the user 735 // misleading metrics. 736 // destinationStatistics.getMessages().increment(); 737 destinationStatistics.getEnqueues().increment(); 738 destinationStatistics.getMessageSize().addSize(message.getSize()); 739 MessageEvaluationContext msgContext = null; 740 741 dispatchLock.readLock().lock(); 742 try { 743 if (!subscriptionRecoveryPolicy.add(context, message)) { 744 return; 745 } 746 synchronized (consumers) { 747 if (consumers.isEmpty()) { 748 onMessageWithNoConsumers(context, message); 749 return; 750 } 751 } 752 msgContext = context.getMessageEvaluationContext(); 753 msgContext.setDestination(destination); 754 msgContext.setMessageReference(message); 755 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { 756 onMessageWithNoConsumers(context, message); 757 } 758 759 } finally { 760 dispatchLock.readLock().unlock(); 761 if (msgContext != null) { 762 msgContext.clear(); 763 } 764 } 765 } 766 767 private final AtomicBoolean expiryTaskInProgress = new AtomicBoolean(false); 768 private final Runnable expireMessagesWork = new Runnable() { 769 @Override 770 public void run() { 771 List<Message> browsedMessages = new InsertionCountList<Message>(); 772 doBrowse(browsedMessages, getMaxExpirePageSize()); 773 expiryTaskInProgress.set(false); 774 } 775 }; 776 private final Runnable expireMessagesTask = new Runnable() { 777 @Override 778 public void run() { 779 if (expiryTaskInProgress.compareAndSet(false, true)) { 780 taskRunnerFactor.execute(expireMessagesWork); 781 } 782 } 783 }; 784 785 @Override 786 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 787 broker.messageExpired(context, reference, subs); 788 // AMQ-2586: Better to leave this stat at zero than to give the user 789 // misleading metrics. 790 // destinationStatistics.getMessages().decrement(); 791 destinationStatistics.getExpired().increment(); 792 MessageAck ack = new MessageAck(); 793 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 794 ack.setDestination(destination); 795 ack.setMessageID(reference.getMessageId()); 796 try { 797 if (subs instanceof DurableTopicSubscription) { 798 ((DurableTopicSubscription)subs).removePending(reference); 799 } 800 acknowledge(context, subs, ack, reference); 801 } catch (Exception e) { 802 LOG.error("Failed to remove expired Message from the store ", e); 803 } 804 } 805 806 @Override 807 protected Logger getLog() { 808 return LOG; 809 } 810 811 protected boolean isOptimizeStorage(){ 812 boolean result = false; 813 814 if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){ 815 result = true; 816 for (DurableTopicSubscription s : durableSubscribers.values()) { 817 if (s.isActive()== false){ 818 result = false; 819 break; 820 } 821 if (s.getPrefetchSize()==0){ 822 result = false; 823 break; 824 } 825 if (s.isSlowConsumer()){ 826 result = false; 827 break; 828 } 829 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){ 830 result = false; 831 break; 832 } 833 } 834 } 835 return result; 836 } 837 838 /** 839 * force a reread of the store - after transaction recovery completion 840 * @param pendingAdditionsCount 841 */ 842 @Override 843 public void clearPendingMessages(int pendingAdditionsCount) { 844 dispatchLock.readLock().lock(); 845 try { 846 for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) { 847 clearPendingAndDispatch(durableTopicSubscription); 848 } 849 } finally { 850 dispatchLock.readLock().unlock(); 851 } 852 } 853 854 private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) { 855 synchronized (durableTopicSubscription.pendingLock) { 856 durableTopicSubscription.pending.clear(); 857 try { 858 durableTopicSubscription.dispatchPending(); 859 } catch (IOException exception) { 860 LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{ 861 durableTopicSubscription, 862 destination, 863 durableTopicSubscription.pending }, exception); 864 } 865 } 866 } 867 868 private void rollback(MessageId poisoned) { 869 dispatchLock.readLock().lock(); 870 try { 871 for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) { 872 durableTopicSubscription.getPending().rollback(poisoned); 873 } 874 } finally { 875 dispatchLock.readLock().unlock(); 876 } 877 } 878 879 public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() { 880 return durableSubscribers; 881 } 882}