001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.CancellationException;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.CopyOnWriteArrayList;
028import java.util.concurrent.Future;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.locks.ReentrantReadWriteLock;
031
032import org.apache.activemq.advisory.AdvisorySupport;
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.ConnectionContext;
035import org.apache.activemq.broker.ProducerBrokerExchange;
036import org.apache.activemq.broker.region.policy.DispatchPolicy;
037import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
038import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
039import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
040import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
041import org.apache.activemq.broker.util.InsertionCountList;
042import org.apache.activemq.command.ActiveMQDestination;
043import org.apache.activemq.command.ExceptionResponse;
044import org.apache.activemq.command.Message;
045import org.apache.activemq.command.MessageAck;
046import org.apache.activemq.command.MessageId;
047import org.apache.activemq.command.ProducerAck;
048import org.apache.activemq.command.ProducerInfo;
049import org.apache.activemq.command.Response;
050import org.apache.activemq.command.SubscriptionInfo;
051import org.apache.activemq.filter.MessageEvaluationContext;
052import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
053import org.apache.activemq.store.MessageRecoveryListener;
054import org.apache.activemq.store.TopicMessageStore;
055import org.apache.activemq.thread.Task;
056import org.apache.activemq.thread.TaskRunner;
057import org.apache.activemq.thread.TaskRunnerFactory;
058import org.apache.activemq.transaction.Synchronization;
059import org.apache.activemq.util.SubscriptionKey;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * The Topic is a destination that sends a copy of a message to every active
065 * Subscription registered.
066 */
067public class Topic extends BaseDestination implements Task {
068    protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
069    private final TopicMessageStore topicStore;
070    protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
071    private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
072    private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
073    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
074    private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
075    private final TaskRunner taskRunner;
076    private final TaskRunnerFactory taskRunnerFactor;
077    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
078    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
079        @Override
080        public void run() {
081            try {
082                Topic.this.taskRunner.wakeup();
083            } catch (InterruptedException e) {
084            }
085        }
086    };
087
088    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
089            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
090        super(brokerService, store, destination, parentStats);
091        this.topicStore = store;
092        subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
093        this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
094        this.taskRunnerFactor = taskFactory;
095    }
096
097    @Override
098    public void initialize() throws Exception {
099        super.initialize();
100        // set non default subscription recovery policy (override policyEntries)
101        if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
102            subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
103            setAlwaysRetroactive(true);
104        }
105        if (store != null) {
106            // AMQ-2586: Better to leave this stat at zero than to give the user
107            // misleading metrics.
108            // int messageCount = store.getMessageCount();
109            // destinationStatistics.getMessages().setCount(messageCount);
110        }
111    }
112
113    @Override
114    public List<Subscription> getConsumers() {
115        synchronized (consumers) {
116            return new ArrayList<Subscription>(consumers);
117        }
118    }
119
120    public boolean lock(MessageReference node, LockOwner sub) {
121        return true;
122    }
123
124    @Override
125    public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
126        if (!sub.getConsumerInfo().isDurable()) {
127
128            // Do a retroactive recovery if needed.
129            if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
130
131                // synchronize with dispatch method so that no new messages are sent
132                // while we are recovering a subscription to avoid out of order messages.
133                dispatchLock.writeLock().lock();
134                try {
135                    boolean applyRecovery = false;
136                    synchronized (consumers) {
137                        if (!consumers.contains(sub)){
138                            sub.add(context, this);
139                            consumers.add(sub);
140                            applyRecovery=true;
141                            super.addSubscription(context, sub);
142                        }
143                    }
144                    if (applyRecovery){
145                        subscriptionRecoveryPolicy.recover(context, this, sub);
146                    }
147                } finally {
148                    dispatchLock.writeLock().unlock();
149                }
150
151            } else {
152                synchronized (consumers) {
153                    if (!consumers.contains(sub)){
154                        sub.add(context, this);
155                        consumers.add(sub);
156                        super.addSubscription(context, sub);
157                    }
158                }
159            }
160        } else {
161            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
162            super.addSubscription(context, sub);
163            sub.add(context, this);
164            if(dsub.isActive()) {
165                synchronized (consumers) {
166                    boolean hasSubscription = false;
167
168                    if (consumers.size() == 0) {
169                        hasSubscription = false;
170                    } else {
171                        for (Subscription currentSub : consumers) {
172                            if (currentSub.getConsumerInfo().isDurable()) {
173                                DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
174                                if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
175                                    hasSubscription = true;
176                                    break;
177                                }
178                            }
179                        }
180                    }
181
182                    if (!hasSubscription) {
183                        consumers.add(sub);
184                    }
185                }
186            }
187            durableSubscribers.put(dsub.getSubscriptionKey(), dsub);
188        }
189    }
190
191    @Override
192    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
193        if (!sub.getConsumerInfo().isDurable()) {
194            boolean removed = false;
195            synchronized (consumers) {
196                removed = consumers.remove(sub);
197            }
198            if (removed) {
199                super.removeSubscription(context, sub, lastDeliveredSequenceId);
200            }
201        }
202        sub.remove(context, this);
203    }
204
205    public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
206        if (topicStore != null) {
207            topicStore.deleteSubscription(key.clientId, key.subscriptionName);
208            DurableTopicSubscription removed = durableSubscribers.remove(key);
209            if (removed != null) {
210                destinationStatistics.getConsumers().decrement();
211                // deactivate and remove
212                removed.deactivate(false, 0l);
213                consumers.remove(removed);
214            }
215        }
216    }
217
218    public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
219        // synchronize with dispatch method so that no new messages are sent
220        // while we are recovering a subscription to avoid out of order messages.
221        dispatchLock.writeLock().lock();
222        try {
223
224            if (topicStore == null) {
225                return;
226            }
227
228            // Recover the durable subscription.
229            String clientId = subscription.getSubscriptionKey().getClientId();
230            String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
231            String selector = subscription.getConsumerInfo().getSelector();
232            SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
233            if (info != null) {
234                // Check to see if selector changed.
235                String s1 = info.getSelector();
236                if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
237                    // Need to delete the subscription
238                    topicStore.deleteSubscription(clientId, subscriptionName);
239                    info = null;
240                    synchronized (consumers) {
241                        consumers.remove(subscription);
242                    }
243                } else {
244                    synchronized (consumers) {
245                        if (!consumers.contains(subscription)) {
246                            consumers.add(subscription);
247                        }
248                    }
249                }
250            }
251
252            // Do we need to create the subscription?
253            if (info == null) {
254                info = new SubscriptionInfo();
255                info.setClientId(clientId);
256                info.setSelector(selector);
257                info.setSubscriptionName(subscriptionName);
258                info.setDestination(getActiveMQDestination());
259                // This destination is an actual destination id.
260                info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
261                // This destination might be a pattern
262                synchronized (consumers) {
263                    consumers.add(subscription);
264                    topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive());
265                }
266            }
267
268            final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
269            msgContext.setDestination(destination);
270            if (subscription.isRecoveryRequired()) {
271                topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
272                    @Override
273                    public boolean recoverMessage(Message message) throws Exception {
274                        message.setRegionDestination(Topic.this);
275                        try {
276                            msgContext.setMessageReference(message);
277                            if (subscription.matches(message, msgContext)) {
278                                subscription.add(message);
279                            }
280                        } catch (IOException e) {
281                            LOG.error("Failed to recover this message {}", message, e);
282                        }
283                        return true;
284                    }
285
286                    @Override
287                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
288                        throw new RuntimeException("Should not be called.");
289                    }
290
291                    @Override
292                    public boolean hasSpace() {
293                        return true;
294                    }
295
296                    @Override
297                    public boolean isDuplicate(MessageId id) {
298                        return false;
299                    }
300                });
301            }
302        } finally {
303            dispatchLock.writeLock().unlock();
304        }
305    }
306
307    public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception {
308        synchronized (consumers) {
309            consumers.remove(sub);
310        }
311        sub.remove(context, this, dispatched);
312    }
313
314    public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
315        if (subscription.getConsumerInfo().isRetroactive()) {
316            subscriptionRecoveryPolicy.recover(context, this, subscription);
317        }
318    }
319
320    @Override
321    public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
322        final ConnectionContext context = producerExchange.getConnectionContext();
323
324        final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
325        producerExchange.incrementSend();
326        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
327                && !context.isInRecoveryMode();
328
329        message.setRegionDestination(this);
330
331        // There is delay between the client sending it and it arriving at the
332        // destination.. it may have expired.
333        if (message.isExpired()) {
334            broker.messageExpired(context, message, null);
335            getDestinationStatistics().getExpired().increment();
336            if (sendProducerAck) {
337                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
338                context.getConnection().dispatchAsync(ack);
339            }
340            return;
341        }
342
343        if (memoryUsage.isFull()) {
344            isFull(context, memoryUsage);
345            fastProducer(context, producerInfo);
346
347            if (isProducerFlowControl() && context.isProducerFlowControl()) {
348
349                if (isFlowControlLogRequired()) {
350                    LOG.warn("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
351                            getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
352                } else {
353                    LOG.debug("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
354                            getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
355                }
356
357                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
358                    throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
359                            + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
360                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
361                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
362                }
363
364                // We can avoid blocking due to low usage if the producer is sending a sync message or
365                // if it is using a producer window
366                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
367                    synchronized (messagesWaitingForSpace) {
368                        messagesWaitingForSpace.add(new Runnable() {
369                            @Override
370                            public void run() {
371                                try {
372
373                                    // While waiting for space to free up... the
374                                    // message may have expired.
375                                    if (message.isExpired()) {
376                                        broker.messageExpired(context, message, null);
377                                        getDestinationStatistics().getExpired().increment();
378                                    } else {
379                                        doMessageSend(producerExchange, message);
380                                    }
381
382                                    if (sendProducerAck) {
383                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
384                                                .getSize());
385                                        context.getConnection().dispatchAsync(ack);
386                                    } else {
387                                        Response response = new Response();
388                                        response.setCorrelationId(message.getCommandId());
389                                        context.getConnection().dispatchAsync(response);
390                                    }
391
392                                } catch (Exception e) {
393                                    if (!sendProducerAck && !context.isInRecoveryMode()) {
394                                        ExceptionResponse response = new ExceptionResponse(e);
395                                        response.setCorrelationId(message.getCommandId());
396                                        context.getConnection().dispatchAsync(response);
397                                    }
398                                }
399                            }
400                        });
401
402                        registerCallbackForNotFullNotification();
403                        context.setDontSendReponse(true);
404                        return;
405                    }
406
407                } else {
408                    // Producer flow control cannot be used, so we have do the flow control
409                    // at the broker by blocking this thread until there is space available.
410
411                    if (memoryUsage.isFull()) {
412                        if (context.isInTransaction()) {
413
414                            int count = 0;
415                            while (!memoryUsage.waitForSpace(1000)) {
416                                if (context.getStopping().get()) {
417                                    throw new IOException("Connection closed, send aborted.");
418                                }
419                                if (count > 2 && context.isInTransaction()) {
420                                    count = 0;
421                                    int size = context.getTransaction().size();
422                                    LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message);
423                                }
424                                count++;
425                            }
426                        } else {
427                            waitForSpace(
428                                    context,
429                                    producerExchange,
430                                    memoryUsage,
431                                    "Usage Manager Memory Usage limit reached. Stopping producer ("
432                                            + message.getProducerId()
433                                            + ") to prevent flooding "
434                                            + getActiveMQDestination().getQualifiedName()
435                                            + "."
436                                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
437                        }
438                    }
439
440                    // The usage manager could have delayed us by the time
441                    // we unblock the message could have expired..
442                    if (message.isExpired()) {
443                        getDestinationStatistics().getExpired().increment();
444                        LOG.debug("Expired message: {}", message);
445                        return;
446                    }
447                }
448            }
449        }
450
451        doMessageSend(producerExchange, message);
452        messageDelivered(context, message);
453        if (sendProducerAck) {
454            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
455            context.getConnection().dispatchAsync(ack);
456        }
457    }
458
459    /**
460     * do send the message - this needs to be synchronized to ensure messages
461     * are stored AND dispatched in the right order
462     *
463     * @param producerExchange
464     * @param message
465     * @throws IOException
466     * @throws Exception
467     */
468    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
469            throws IOException, Exception {
470        final ConnectionContext context = producerExchange.getConnectionContext();
471        message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
472        Future<Object> result = null;
473
474        if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
475            if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
476                final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
477                        + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
478                        + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
479                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
480                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
481                    throw new javax.jms.ResourceAllocationException(logMessage);
482                }
483
484                waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
485            }
486            result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
487        }
488
489        message.incrementReferenceCount();
490
491        if (context.isInTransaction()) {
492            context.getTransaction().addSynchronization(new Synchronization() {
493                @Override
494                public void afterCommit() throws Exception {
495                    // It could take while before we receive the commit
496                    // operation.. by that time the message could have
497                    // expired..
498                    if (message.isExpired()) {
499                        if (broker.isExpired(message)) {
500                            getDestinationStatistics().getExpired().increment();
501                            broker.messageExpired(context, message, null);
502                        }
503                        message.decrementReferenceCount();
504                        return;
505                    }
506                    try {
507                        dispatch(context, message);
508                    } finally {
509                        message.decrementReferenceCount();
510                    }
511                }
512
513                @Override
514                public void afterRollback() throws Exception {
515                    message.decrementReferenceCount();
516                }
517            });
518
519        } else {
520            try {
521                dispatch(context, message);
522            } finally {
523                message.decrementReferenceCount();
524            }
525        }
526
527        if (result != null && !result.isCancelled()) {
528            try {
529                result.get();
530            } catch (CancellationException e) {
531                // ignore - the task has been cancelled if the message
532                // has already been deleted
533            }
534        }
535    }
536
537    private boolean canOptimizeOutPersistence() {
538        return durableSubscribers.size() == 0;
539    }
540
541    @Override
542    public String toString() {
543        return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
544    }
545
546    @Override
547    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
548            final MessageReference node) throws IOException {
549        if (topicStore != null && node.isPersistent()) {
550            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
551            SubscriptionKey key = dsub.getSubscriptionKey();
552            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
553                    convertToNonRangedAck(ack, node));
554        }
555        messageConsumed(context, node);
556    }
557
558    @Override
559    public void gc() {
560    }
561
562    public Message loadMessage(MessageId messageId) throws IOException {
563        return topicStore != null ? topicStore.getMessage(messageId) : null;
564    }
565
566    @Override
567    public void start() throws Exception {
568        if (started.compareAndSet(false, true)) {
569            this.subscriptionRecoveryPolicy.start();
570            if (memoryUsage != null) {
571                memoryUsage.start();
572            }
573
574            if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) {
575                scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
576            }
577        }
578    }
579
580    @Override
581    public void stop() throws Exception {
582        if (started.compareAndSet(true, false)) {
583            if (taskRunner != null) {
584                taskRunner.shutdown();
585            }
586            this.subscriptionRecoveryPolicy.stop();
587            if (memoryUsage != null) {
588                memoryUsage.stop();
589            }
590            if (this.topicStore != null) {
591                this.topicStore.stop();
592            }
593
594            scheduler.cancel(expireMessagesTask);
595        }
596    }
597
598    @Override
599    public Message[] browse() {
600        final List<Message> result = new ArrayList<Message>();
601        doBrowse(result, getMaxBrowsePageSize());
602        return result.toArray(new Message[result.size()]);
603    }
604
605    private void doBrowse(final List<Message> browseList, final int max) {
606        try {
607            if (topicStore != null) {
608                final List<Message> toExpire = new ArrayList<Message>();
609                topicStore.recover(new MessageRecoveryListener() {
610                    @Override
611                    public boolean recoverMessage(Message message) throws Exception {
612                        if (message.isExpired()) {
613                            toExpire.add(message);
614                        }
615                        browseList.add(message);
616                        return true;
617                    }
618
619                    @Override
620                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
621                        return true;
622                    }
623
624                    @Override
625                    public boolean hasSpace() {
626                        return browseList.size() < max;
627                    }
628
629                    @Override
630                    public boolean isDuplicate(MessageId id) {
631                        return false;
632                    }
633                });
634                final ConnectionContext connectionContext = createConnectionContext();
635                for (Message message : toExpire) {
636                    for (DurableTopicSubscription sub : durableSubscribers.values()) {
637                        if (!sub.isActive()) {
638                            message.setRegionDestination(this);
639                            messageExpired(connectionContext, sub, message);
640                        }
641                    }
642                }
643                Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
644                if (msgs != null) {
645                    for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
646                        browseList.add(msgs[i]);
647                    }
648                }
649            }
650        } catch (Throwable e) {
651            LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e);
652        }
653    }
654
655    @Override
656    public boolean iterate() {
657        synchronized (messagesWaitingForSpace) {
658            while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
659                Runnable op = messagesWaitingForSpace.removeFirst();
660                op.run();
661            }
662
663            if (!messagesWaitingForSpace.isEmpty()) {
664                registerCallbackForNotFullNotification();
665            }
666        }
667        return false;
668    }
669
670    private void registerCallbackForNotFullNotification() {
671        // If the usage manager is not full, then the task will not
672        // get called..
673        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
674            // so call it directly here.
675            sendMessagesWaitingForSpaceTask.run();
676        }
677    }
678
679    // Properties
680    // -------------------------------------------------------------------------
681
682    public DispatchPolicy getDispatchPolicy() {
683        return dispatchPolicy;
684    }
685
686    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
687        this.dispatchPolicy = dispatchPolicy;
688    }
689
690    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
691        return subscriptionRecoveryPolicy;
692    }
693
694    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) {
695        if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) {
696            // allow users to combine retained message policy with other ActiveMQ policies
697            RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy;
698            policy.setWrapped(recoveryPolicy);
699        } else {
700            this.subscriptionRecoveryPolicy = recoveryPolicy;
701        }
702    }
703
704    // Implementation methods
705    // -------------------------------------------------------------------------
706
707    @Override
708    public final void wakeup() {
709    }
710
711    protected void dispatch(final ConnectionContext context, Message message) throws Exception {
712        // AMQ-2586: Better to leave this stat at zero than to give the user
713        // misleading metrics.
714        // destinationStatistics.getMessages().increment();
715        destinationStatistics.getEnqueues().increment();
716        destinationStatistics.getMessageSize().addSize(message.getSize());
717        MessageEvaluationContext msgContext = null;
718
719        dispatchLock.readLock().lock();
720        try {
721            if (!subscriptionRecoveryPolicy.add(context, message)) {
722                return;
723            }
724            synchronized (consumers) {
725                if (consumers.isEmpty()) {
726                    onMessageWithNoConsumers(context, message);
727                    return;
728                }
729            }
730            msgContext = context.getMessageEvaluationContext();
731            msgContext.setDestination(destination);
732            msgContext.setMessageReference(message);
733            if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
734                onMessageWithNoConsumers(context, message);
735            }
736
737        } finally {
738            dispatchLock.readLock().unlock();
739            if (msgContext != null) {
740                msgContext.clear();
741            }
742        }
743    }
744
745    private final AtomicBoolean expiryTaskInProgress = new AtomicBoolean(false);
746    private final Runnable expireMessagesWork = new Runnable() {
747        @Override
748        public void run() {
749            List<Message> browsedMessages = new InsertionCountList<Message>();
750            doBrowse(browsedMessages, getMaxExpirePageSize());
751            expiryTaskInProgress.set(false);
752        }
753    };
754    private final Runnable expireMessagesTask = new Runnable() {
755        @Override
756        public void run() {
757            if (expiryTaskInProgress.compareAndSet(false, true)) {
758                taskRunnerFactor.execute(expireMessagesWork);
759            }
760        }
761    };
762
763    @Override
764    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
765        broker.messageExpired(context, reference, subs);
766        // AMQ-2586: Better to leave this stat at zero than to give the user
767        // misleading metrics.
768        // destinationStatistics.getMessages().decrement();
769        destinationStatistics.getExpired().increment();
770        MessageAck ack = new MessageAck();
771        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
772        ack.setDestination(destination);
773        ack.setMessageID(reference.getMessageId());
774        try {
775            if (subs instanceof DurableTopicSubscription) {
776                ((DurableTopicSubscription)subs).removePending(reference);
777            }
778            acknowledge(context, subs, ack, reference);
779        } catch (Exception e) {
780            LOG.error("Failed to remove expired Message from the store ", e);
781        }
782    }
783
784    @Override
785    protected Logger getLog() {
786        return LOG;
787    }
788
789    protected boolean isOptimizeStorage(){
790        boolean result = false;
791
792        if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){
793                result = true;
794                for (DurableTopicSubscription s : durableSubscribers.values()) {
795                    if (s.isActive()== false){
796                        result = false;
797                        break;
798                    }
799                    if (s.getPrefetchSize()==0){
800                        result = false;
801                        break;
802                    }
803                    if (s.isSlowConsumer()){
804                        result = false;
805                        break;
806                    }
807                    if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
808                        result = false;
809                        break;
810                    }
811                }
812        }
813        return result;
814    }
815
816    /**
817     * force a reread of the store - after transaction recovery completion
818     * @param pendingAdditionsCount
819     */
820    @Override
821    public void clearPendingMessages(int pendingAdditionsCount) {
822        dispatchLock.readLock().lock();
823        try {
824            for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
825                clearPendingAndDispatch(durableTopicSubscription);
826            }
827        } finally {
828            dispatchLock.readLock().unlock();
829        }
830    }
831
832    private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
833        synchronized (durableTopicSubscription.pendingLock) {
834            durableTopicSubscription.pending.clear();
835            try {
836                durableTopicSubscription.dispatchPending();
837            } catch (IOException exception) {
838                LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{
839                        durableTopicSubscription,
840                        destination,
841                        durableTopicSubscription.pending }, exception);
842            }
843        }
844    }
845
846    private void rollback(MessageId poisoned) {
847        dispatchLock.readLock().lock();
848        try {
849            for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
850                durableTopicSubscription.getPending().rollback(poisoned);
851            }
852        } finally {
853            dispatchLock.readLock().unlock();
854        }
855    }
856
857    public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
858        return durableSubscribers;
859    }
860}