001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.CancellationException; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.CopyOnWriteArrayList; 028import java.util.concurrent.Future; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.locks.ReentrantReadWriteLock; 031 032import org.apache.activemq.advisory.AdvisorySupport; 033import org.apache.activemq.broker.BrokerService; 034import org.apache.activemq.broker.ConnectionContext; 035import org.apache.activemq.broker.ProducerBrokerExchange; 036import org.apache.activemq.broker.region.policy.DispatchPolicy; 037import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; 038import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; 039import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; 040import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; 041import org.apache.activemq.broker.util.InsertionCountList; 042import org.apache.activemq.command.ActiveMQDestination; 043import org.apache.activemq.command.ExceptionResponse; 044import org.apache.activemq.command.Message; 045import org.apache.activemq.command.MessageAck; 046import org.apache.activemq.command.MessageId; 047import org.apache.activemq.command.ProducerAck; 048import org.apache.activemq.command.ProducerInfo; 049import org.apache.activemq.command.Response; 050import org.apache.activemq.command.SubscriptionInfo; 051import org.apache.activemq.filter.MessageEvaluationContext; 052import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 053import org.apache.activemq.store.MessageRecoveryListener; 054import org.apache.activemq.store.TopicMessageStore; 055import org.apache.activemq.thread.Task; 056import org.apache.activemq.thread.TaskRunner; 057import org.apache.activemq.thread.TaskRunnerFactory; 058import org.apache.activemq.transaction.Synchronization; 059import org.apache.activemq.util.SubscriptionKey; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * The Topic is a destination that sends a copy of a message to every active 065 * Subscription registered. 066 */ 067public class Topic extends BaseDestination implements Task { 068 protected static final Logger LOG = LoggerFactory.getLogger(Topic.class); 069 private final TopicMessageStore topicStore; 070 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); 071 private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock(); 072 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); 073 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 074 private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 075 private final TaskRunner taskRunner; 076 private final TaskRunnerFactory taskRunnerFactor; 077 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); 078 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 079 @Override 080 public void run() { 081 try { 082 Topic.this.taskRunner.wakeup(); 083 } catch (InterruptedException e) { 084 } 085 } 086 }; 087 088 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, 089 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 090 super(brokerService, store, destination, parentStats); 091 this.topicStore = store; 092 subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null); 093 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); 094 this.taskRunnerFactor = taskFactory; 095 } 096 097 @Override 098 public void initialize() throws Exception { 099 super.initialize(); 100 // set non default subscription recovery policy (override policyEntries) 101 if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) { 102 subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); 103 setAlwaysRetroactive(true); 104 } 105 if (store != null) { 106 // AMQ-2586: Better to leave this stat at zero than to give the user 107 // misleading metrics. 108 // int messageCount = store.getMessageCount(); 109 // destinationStatistics.getMessages().setCount(messageCount); 110 } 111 } 112 113 @Override 114 public List<Subscription> getConsumers() { 115 synchronized (consumers) { 116 return new ArrayList<Subscription>(consumers); 117 } 118 } 119 120 public boolean lock(MessageReference node, LockOwner sub) { 121 return true; 122 } 123 124 @Override 125 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { 126 if (!sub.getConsumerInfo().isDurable()) { 127 128 // Do a retroactive recovery if needed. 129 if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) { 130 131 // synchronize with dispatch method so that no new messages are sent 132 // while we are recovering a subscription to avoid out of order messages. 133 dispatchLock.writeLock().lock(); 134 try { 135 boolean applyRecovery = false; 136 synchronized (consumers) { 137 if (!consumers.contains(sub)){ 138 sub.add(context, this); 139 consumers.add(sub); 140 applyRecovery=true; 141 super.addSubscription(context, sub); 142 } 143 } 144 if (applyRecovery){ 145 subscriptionRecoveryPolicy.recover(context, this, sub); 146 } 147 } finally { 148 dispatchLock.writeLock().unlock(); 149 } 150 151 } else { 152 synchronized (consumers) { 153 if (!consumers.contains(sub)){ 154 sub.add(context, this); 155 consumers.add(sub); 156 super.addSubscription(context, sub); 157 } 158 } 159 } 160 } else { 161 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 162 super.addSubscription(context, sub); 163 sub.add(context, this); 164 if(dsub.isActive()) { 165 synchronized (consumers) { 166 boolean hasSubscription = false; 167 168 if (consumers.size() == 0) { 169 hasSubscription = false; 170 } else { 171 for (Subscription currentSub : consumers) { 172 if (currentSub.getConsumerInfo().isDurable()) { 173 DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub; 174 if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) { 175 hasSubscription = true; 176 break; 177 } 178 } 179 } 180 } 181 182 if (!hasSubscription) { 183 consumers.add(sub); 184 } 185 } 186 } 187 durableSubscribers.put(dsub.getSubscriptionKey(), dsub); 188 } 189 } 190 191 @Override 192 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { 193 if (!sub.getConsumerInfo().isDurable()) { 194 boolean removed = false; 195 synchronized (consumers) { 196 removed = consumers.remove(sub); 197 } 198 if (removed) { 199 super.removeSubscription(context, sub, lastDeliveredSequenceId); 200 } 201 } 202 sub.remove(context, this); 203 } 204 205 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { 206 if (topicStore != null) { 207 topicStore.deleteSubscription(key.clientId, key.subscriptionName); 208 DurableTopicSubscription removed = durableSubscribers.remove(key); 209 if (removed != null) { 210 destinationStatistics.getConsumers().decrement(); 211 // deactivate and remove 212 removed.deactivate(false, 0l); 213 consumers.remove(removed); 214 } 215 } 216 } 217 218 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { 219 // synchronize with dispatch method so that no new messages are sent 220 // while we are recovering a subscription to avoid out of order messages. 221 dispatchLock.writeLock().lock(); 222 try { 223 224 if (topicStore == null) { 225 return; 226 } 227 228 // Recover the durable subscription. 229 String clientId = subscription.getSubscriptionKey().getClientId(); 230 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName(); 231 String selector = subscription.getConsumerInfo().getSelector(); 232 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName); 233 if (info != null) { 234 // Check to see if selector changed. 235 String s1 = info.getSelector(); 236 if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) { 237 // Need to delete the subscription 238 topicStore.deleteSubscription(clientId, subscriptionName); 239 info = null; 240 synchronized (consumers) { 241 consumers.remove(subscription); 242 } 243 } else { 244 synchronized (consumers) { 245 if (!consumers.contains(subscription)) { 246 consumers.add(subscription); 247 } 248 } 249 } 250 } 251 252 // Do we need to create the subscription? 253 if (info == null) { 254 info = new SubscriptionInfo(); 255 info.setClientId(clientId); 256 info.setSelector(selector); 257 info.setSubscriptionName(subscriptionName); 258 info.setDestination(getActiveMQDestination()); 259 // This destination is an actual destination id. 260 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); 261 // This destination might be a pattern 262 synchronized (consumers) { 263 consumers.add(subscription); 264 topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive()); 265 } 266 } 267 268 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 269 msgContext.setDestination(destination); 270 if (subscription.isRecoveryRequired()) { 271 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { 272 @Override 273 public boolean recoverMessage(Message message) throws Exception { 274 message.setRegionDestination(Topic.this); 275 try { 276 msgContext.setMessageReference(message); 277 if (subscription.matches(message, msgContext)) { 278 subscription.add(message); 279 } 280 } catch (IOException e) { 281 LOG.error("Failed to recover this message {}", message, e); 282 } 283 return true; 284 } 285 286 @Override 287 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 288 throw new RuntimeException("Should not be called."); 289 } 290 291 @Override 292 public boolean hasSpace() { 293 return true; 294 } 295 296 @Override 297 public boolean isDuplicate(MessageId id) { 298 return false; 299 } 300 }); 301 } 302 } finally { 303 dispatchLock.writeLock().unlock(); 304 } 305 } 306 307 public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception { 308 synchronized (consumers) { 309 consumers.remove(sub); 310 } 311 sub.remove(context, this, dispatched); 312 } 313 314 public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { 315 if (subscription.getConsumerInfo().isRetroactive()) { 316 subscriptionRecoveryPolicy.recover(context, this, subscription); 317 } 318 } 319 320 @Override 321 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 322 final ConnectionContext context = producerExchange.getConnectionContext(); 323 324 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 325 producerExchange.incrementSend(); 326 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 327 && !context.isInRecoveryMode(); 328 329 message.setRegionDestination(this); 330 331 // There is delay between the client sending it and it arriving at the 332 // destination.. it may have expired. 333 if (message.isExpired()) { 334 broker.messageExpired(context, message, null); 335 getDestinationStatistics().getExpired().increment(); 336 if (sendProducerAck) { 337 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 338 context.getConnection().dispatchAsync(ack); 339 } 340 return; 341 } 342 343 if (memoryUsage.isFull()) { 344 isFull(context, memoryUsage); 345 fastProducer(context, producerInfo); 346 347 if (isProducerFlowControl() && context.isProducerFlowControl()) { 348 349 if (isFlowControlLogRequired()) { 350 LOG.warn("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 351 getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); 352 } else { 353 LOG.debug("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 354 getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); 355 } 356 357 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 358 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit (" 359 + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId() 360 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 361 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 362 } 363 364 // We can avoid blocking due to low usage if the producer is sending a sync message or 365 // if it is using a producer window 366 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 367 synchronized (messagesWaitingForSpace) { 368 messagesWaitingForSpace.add(new Runnable() { 369 @Override 370 public void run() { 371 try { 372 373 // While waiting for space to free up... the 374 // message may have expired. 375 if (message.isExpired()) { 376 broker.messageExpired(context, message, null); 377 getDestinationStatistics().getExpired().increment(); 378 } else { 379 doMessageSend(producerExchange, message); 380 } 381 382 if (sendProducerAck) { 383 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 384 .getSize()); 385 context.getConnection().dispatchAsync(ack); 386 } else { 387 Response response = new Response(); 388 response.setCorrelationId(message.getCommandId()); 389 context.getConnection().dispatchAsync(response); 390 } 391 392 } catch (Exception e) { 393 if (!sendProducerAck && !context.isInRecoveryMode()) { 394 ExceptionResponse response = new ExceptionResponse(e); 395 response.setCorrelationId(message.getCommandId()); 396 context.getConnection().dispatchAsync(response); 397 } 398 } 399 } 400 }); 401 402 registerCallbackForNotFullNotification(); 403 context.setDontSendReponse(true); 404 return; 405 } 406 407 } else { 408 // Producer flow control cannot be used, so we have do the flow control 409 // at the broker by blocking this thread until there is space available. 410 411 if (memoryUsage.isFull()) { 412 if (context.isInTransaction()) { 413 414 int count = 0; 415 while (!memoryUsage.waitForSpace(1000)) { 416 if (context.getStopping().get()) { 417 throw new IOException("Connection closed, send aborted."); 418 } 419 if (count > 2 && context.isInTransaction()) { 420 count = 0; 421 int size = context.getTransaction().size(); 422 LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message); 423 } 424 count++; 425 } 426 } else { 427 waitForSpace( 428 context, 429 producerExchange, 430 memoryUsage, 431 "Usage Manager Memory Usage limit reached. Stopping producer (" 432 + message.getProducerId() 433 + ") to prevent flooding " 434 + getActiveMQDestination().getQualifiedName() 435 + "." 436 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 437 } 438 } 439 440 // The usage manager could have delayed us by the time 441 // we unblock the message could have expired.. 442 if (message.isExpired()) { 443 getDestinationStatistics().getExpired().increment(); 444 LOG.debug("Expired message: {}", message); 445 return; 446 } 447 } 448 } 449 } 450 451 doMessageSend(producerExchange, message); 452 messageDelivered(context, message); 453 if (sendProducerAck) { 454 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 455 context.getConnection().dispatchAsync(ack); 456 } 457 } 458 459 /** 460 * do send the message - this needs to be synchronized to ensure messages 461 * are stored AND dispatched in the right order 462 * 463 * @param producerExchange 464 * @param message 465 * @throws IOException 466 * @throws Exception 467 */ 468 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) 469 throws IOException, Exception { 470 final ConnectionContext context = producerExchange.getConnectionContext(); 471 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 472 Future<Object> result = null; 473 474 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { 475 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 476 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of " 477 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() 478 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 479 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 480 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 481 throw new javax.jms.ResourceAllocationException(logMessage); 482 } 483 484 waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 485 } 486 result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage()); 487 } 488 489 message.incrementReferenceCount(); 490 491 if (context.isInTransaction()) { 492 context.getTransaction().addSynchronization(new Synchronization() { 493 @Override 494 public void afterCommit() throws Exception { 495 // It could take while before we receive the commit 496 // operation.. by that time the message could have 497 // expired.. 498 if (message.isExpired()) { 499 if (broker.isExpired(message)) { 500 getDestinationStatistics().getExpired().increment(); 501 broker.messageExpired(context, message, null); 502 } 503 message.decrementReferenceCount(); 504 return; 505 } 506 try { 507 dispatch(context, message); 508 } finally { 509 message.decrementReferenceCount(); 510 } 511 } 512 513 @Override 514 public void afterRollback() throws Exception { 515 message.decrementReferenceCount(); 516 } 517 }); 518 519 } else { 520 try { 521 dispatch(context, message); 522 } finally { 523 message.decrementReferenceCount(); 524 } 525 } 526 527 if (result != null && !result.isCancelled()) { 528 try { 529 result.get(); 530 } catch (CancellationException e) { 531 // ignore - the task has been cancelled if the message 532 // has already been deleted 533 } 534 } 535 } 536 537 private boolean canOptimizeOutPersistence() { 538 return durableSubscribers.size() == 0; 539 } 540 541 @Override 542 public String toString() { 543 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); 544 } 545 546 @Override 547 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, 548 final MessageReference node) throws IOException { 549 if (topicStore != null && node.isPersistent()) { 550 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 551 SubscriptionKey key = dsub.getSubscriptionKey(); 552 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), 553 convertToNonRangedAck(ack, node)); 554 } 555 messageConsumed(context, node); 556 } 557 558 @Override 559 public void gc() { 560 } 561 562 public Message loadMessage(MessageId messageId) throws IOException { 563 return topicStore != null ? topicStore.getMessage(messageId) : null; 564 } 565 566 @Override 567 public void start() throws Exception { 568 if (started.compareAndSet(false, true)) { 569 this.subscriptionRecoveryPolicy.start(); 570 if (memoryUsage != null) { 571 memoryUsage.start(); 572 } 573 574 if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) { 575 scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); 576 } 577 } 578 } 579 580 @Override 581 public void stop() throws Exception { 582 if (started.compareAndSet(true, false)) { 583 if (taskRunner != null) { 584 taskRunner.shutdown(); 585 } 586 this.subscriptionRecoveryPolicy.stop(); 587 if (memoryUsage != null) { 588 memoryUsage.stop(); 589 } 590 if (this.topicStore != null) { 591 this.topicStore.stop(); 592 } 593 594 scheduler.cancel(expireMessagesTask); 595 } 596 } 597 598 @Override 599 public Message[] browse() { 600 final List<Message> result = new ArrayList<Message>(); 601 doBrowse(result, getMaxBrowsePageSize()); 602 return result.toArray(new Message[result.size()]); 603 } 604 605 private void doBrowse(final List<Message> browseList, final int max) { 606 try { 607 if (topicStore != null) { 608 final List<Message> toExpire = new ArrayList<Message>(); 609 topicStore.recover(new MessageRecoveryListener() { 610 @Override 611 public boolean recoverMessage(Message message) throws Exception { 612 if (message.isExpired()) { 613 toExpire.add(message); 614 } 615 browseList.add(message); 616 return true; 617 } 618 619 @Override 620 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 621 return true; 622 } 623 624 @Override 625 public boolean hasSpace() { 626 return browseList.size() < max; 627 } 628 629 @Override 630 public boolean isDuplicate(MessageId id) { 631 return false; 632 } 633 }); 634 final ConnectionContext connectionContext = createConnectionContext(); 635 for (Message message : toExpire) { 636 for (DurableTopicSubscription sub : durableSubscribers.values()) { 637 if (!sub.isActive()) { 638 message.setRegionDestination(this); 639 messageExpired(connectionContext, sub, message); 640 } 641 } 642 } 643 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination()); 644 if (msgs != null) { 645 for (int i = 0; i < msgs.length && browseList.size() < max; i++) { 646 browseList.add(msgs[i]); 647 } 648 } 649 } 650 } catch (Throwable e) { 651 LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e); 652 } 653 } 654 655 @Override 656 public boolean iterate() { 657 synchronized (messagesWaitingForSpace) { 658 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { 659 Runnable op = messagesWaitingForSpace.removeFirst(); 660 op.run(); 661 } 662 663 if (!messagesWaitingForSpace.isEmpty()) { 664 registerCallbackForNotFullNotification(); 665 } 666 } 667 return false; 668 } 669 670 private void registerCallbackForNotFullNotification() { 671 // If the usage manager is not full, then the task will not 672 // get called.. 673 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 674 // so call it directly here. 675 sendMessagesWaitingForSpaceTask.run(); 676 } 677 } 678 679 // Properties 680 // ------------------------------------------------------------------------- 681 682 public DispatchPolicy getDispatchPolicy() { 683 return dispatchPolicy; 684 } 685 686 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 687 this.dispatchPolicy = dispatchPolicy; 688 } 689 690 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 691 return subscriptionRecoveryPolicy; 692 } 693 694 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) { 695 if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) { 696 // allow users to combine retained message policy with other ActiveMQ policies 697 RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy; 698 policy.setWrapped(recoveryPolicy); 699 } else { 700 this.subscriptionRecoveryPolicy = recoveryPolicy; 701 } 702 } 703 704 // Implementation methods 705 // ------------------------------------------------------------------------- 706 707 @Override 708 public final void wakeup() { 709 } 710 711 protected void dispatch(final ConnectionContext context, Message message) throws Exception { 712 // AMQ-2586: Better to leave this stat at zero than to give the user 713 // misleading metrics. 714 // destinationStatistics.getMessages().increment(); 715 destinationStatistics.getEnqueues().increment(); 716 destinationStatistics.getMessageSize().addSize(message.getSize()); 717 MessageEvaluationContext msgContext = null; 718 719 dispatchLock.readLock().lock(); 720 try { 721 if (!subscriptionRecoveryPolicy.add(context, message)) { 722 return; 723 } 724 synchronized (consumers) { 725 if (consumers.isEmpty()) { 726 onMessageWithNoConsumers(context, message); 727 return; 728 } 729 } 730 msgContext = context.getMessageEvaluationContext(); 731 msgContext.setDestination(destination); 732 msgContext.setMessageReference(message); 733 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { 734 onMessageWithNoConsumers(context, message); 735 } 736 737 } finally { 738 dispatchLock.readLock().unlock(); 739 if (msgContext != null) { 740 msgContext.clear(); 741 } 742 } 743 } 744 745 private final AtomicBoolean expiryTaskInProgress = new AtomicBoolean(false); 746 private final Runnable expireMessagesWork = new Runnable() { 747 @Override 748 public void run() { 749 List<Message> browsedMessages = new InsertionCountList<Message>(); 750 doBrowse(browsedMessages, getMaxExpirePageSize()); 751 expiryTaskInProgress.set(false); 752 } 753 }; 754 private final Runnable expireMessagesTask = new Runnable() { 755 @Override 756 public void run() { 757 if (expiryTaskInProgress.compareAndSet(false, true)) { 758 taskRunnerFactor.execute(expireMessagesWork); 759 } 760 } 761 }; 762 763 @Override 764 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 765 broker.messageExpired(context, reference, subs); 766 // AMQ-2586: Better to leave this stat at zero than to give the user 767 // misleading metrics. 768 // destinationStatistics.getMessages().decrement(); 769 destinationStatistics.getExpired().increment(); 770 MessageAck ack = new MessageAck(); 771 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 772 ack.setDestination(destination); 773 ack.setMessageID(reference.getMessageId()); 774 try { 775 if (subs instanceof DurableTopicSubscription) { 776 ((DurableTopicSubscription)subs).removePending(reference); 777 } 778 acknowledge(context, subs, ack, reference); 779 } catch (Exception e) { 780 LOG.error("Failed to remove expired Message from the store ", e); 781 } 782 } 783 784 @Override 785 protected Logger getLog() { 786 return LOG; 787 } 788 789 protected boolean isOptimizeStorage(){ 790 boolean result = false; 791 792 if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){ 793 result = true; 794 for (DurableTopicSubscription s : durableSubscribers.values()) { 795 if (s.isActive()== false){ 796 result = false; 797 break; 798 } 799 if (s.getPrefetchSize()==0){ 800 result = false; 801 break; 802 } 803 if (s.isSlowConsumer()){ 804 result = false; 805 break; 806 } 807 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){ 808 result = false; 809 break; 810 } 811 } 812 } 813 return result; 814 } 815 816 /** 817 * force a reread of the store - after transaction recovery completion 818 * @param pendingAdditionsCount 819 */ 820 @Override 821 public void clearPendingMessages(int pendingAdditionsCount) { 822 dispatchLock.readLock().lock(); 823 try { 824 for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) { 825 clearPendingAndDispatch(durableTopicSubscription); 826 } 827 } finally { 828 dispatchLock.readLock().unlock(); 829 } 830 } 831 832 private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) { 833 synchronized (durableTopicSubscription.pendingLock) { 834 durableTopicSubscription.pending.clear(); 835 try { 836 durableTopicSubscription.dispatchPending(); 837 } catch (IOException exception) { 838 LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{ 839 durableTopicSubscription, 840 destination, 841 durableTopicSubscription.pending }, exception); 842 } 843 } 844 } 845 846 private void rollback(MessageId poisoned) { 847 dispatchLock.readLock().lock(); 848 try { 849 for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) { 850 durableTopicSubscription.getPending().rollback(poisoned); 851 } 852 } finally { 853 dispatchLock.readLock().unlock(); 854 } 855 } 856 857 public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() { 858 return durableSubscribers; 859 } 860}