001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.CancellationException;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.CopyOnWriteArrayList;
028import java.util.concurrent.Future;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.locks.ReentrantReadWriteLock;
031
032import org.apache.activemq.advisory.AdvisorySupport;
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.ConnectionContext;
035import org.apache.activemq.broker.ProducerBrokerExchange;
036import org.apache.activemq.broker.region.policy.DispatchPolicy;
037import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
038import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
039import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
040import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
041import org.apache.activemq.broker.util.InsertionCountList;
042import org.apache.activemq.command.ActiveMQDestination;
043import org.apache.activemq.command.ExceptionResponse;
044import org.apache.activemq.command.Message;
045import org.apache.activemq.command.MessageAck;
046import org.apache.activemq.command.MessageId;
047import org.apache.activemq.command.ProducerAck;
048import org.apache.activemq.command.ProducerInfo;
049import org.apache.activemq.command.Response;
050import org.apache.activemq.command.SubscriptionInfo;
051import org.apache.activemq.filter.MessageEvaluationContext;
052import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
053import org.apache.activemq.store.MessageRecoveryListener;
054import org.apache.activemq.store.TopicMessageStore;
055import org.apache.activemq.thread.Task;
056import org.apache.activemq.thread.TaskRunner;
057import org.apache.activemq.thread.TaskRunnerFactory;
058import org.apache.activemq.transaction.Synchronization;
059import org.apache.activemq.util.SubscriptionKey;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import javax.jms.JMSException;
064
065import static org.apache.activemq.transaction.Transaction.IN_USE_STATE;
066
067/**
068 * The Topic is a destination that sends a copy of a message to every active
069 * Subscription registered.
070 */
071public class Topic extends BaseDestination implements Task {
072    protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
073    private final TopicMessageStore topicStore;
074    protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
075    private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
076    private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
077    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
078    private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
079    private final TaskRunner taskRunner;
080    private final TaskRunnerFactory taskRunnerFactor;
081    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
082    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
083        @Override
084        public void run() {
085            try {
086                Topic.this.taskRunner.wakeup();
087            } catch (InterruptedException e) {
088            }
089        }
090    };
091
092    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
093            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
094        super(brokerService, store, destination, parentStats);
095        this.topicStore = store;
096        subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
097        this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
098        this.taskRunnerFactor = taskFactory;
099    }
100
101    @Override
102    public void initialize() throws Exception {
103        super.initialize();
104        // set non default subscription recovery policy (override policyEntries)
105        if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
106            subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
107            setAlwaysRetroactive(true);
108        }
109        if (store != null) {
110            // AMQ-2586: Better to leave this stat at zero than to give the user
111            // misleading metrics.
112            // int messageCount = store.getMessageCount();
113            // destinationStatistics.getMessages().setCount(messageCount);
114        }
115    }
116
117    @Override
118    public List<Subscription> getConsumers() {
119        synchronized (consumers) {
120            return new ArrayList<Subscription>(consumers);
121        }
122    }
123
124    public boolean lock(MessageReference node, LockOwner sub) {
125        return true;
126    }
127
128    @Override
129    public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
130        if (!sub.getConsumerInfo().isDurable()) {
131
132            // Do a retroactive recovery if needed.
133            if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
134
135                // synchronize with dispatch method so that no new messages are sent
136                // while we are recovering a subscription to avoid out of order messages.
137                dispatchLock.writeLock().lock();
138                try {
139                    boolean applyRecovery = false;
140                    synchronized (consumers) {
141                        if (!consumers.contains(sub)){
142                            sub.add(context, this);
143                            consumers.add(sub);
144                            applyRecovery=true;
145                            super.addSubscription(context, sub);
146                        }
147                    }
148                    if (applyRecovery){
149                        subscriptionRecoveryPolicy.recover(context, this, sub);
150                    }
151                } finally {
152                    dispatchLock.writeLock().unlock();
153                }
154
155            } else {
156                synchronized (consumers) {
157                    if (!consumers.contains(sub)){
158                        sub.add(context, this);
159                        consumers.add(sub);
160                        super.addSubscription(context, sub);
161                    }
162                }
163            }
164        } else {
165            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
166            super.addSubscription(context, sub);
167            sub.add(context, this);
168            if(dsub.isActive()) {
169                synchronized (consumers) {
170                    boolean hasSubscription = false;
171
172                    if (consumers.size() == 0) {
173                        hasSubscription = false;
174                    } else {
175                        for (Subscription currentSub : consumers) {
176                            if (currentSub.getConsumerInfo().isDurable()) {
177                                DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
178                                if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
179                                    hasSubscription = true;
180                                    break;
181                                }
182                            }
183                        }
184                    }
185
186                    if (!hasSubscription) {
187                        consumers.add(sub);
188                    }
189                }
190            }
191            durableSubscribers.put(dsub.getSubscriptionKey(), dsub);
192        }
193    }
194
195    @Override
196    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
197        if (!sub.getConsumerInfo().isDurable()) {
198            boolean removed = false;
199            synchronized (consumers) {
200                removed = consumers.remove(sub);
201            }
202            if (removed) {
203                super.removeSubscription(context, sub, lastDeliveredSequenceId);
204            }
205        }
206        sub.remove(context, this);
207    }
208
209    public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
210        if (topicStore != null) {
211            topicStore.deleteSubscription(key.clientId, key.subscriptionName);
212            DurableTopicSubscription removed = durableSubscribers.remove(key);
213            if (removed != null) {
214                destinationStatistics.getConsumers().decrement();
215                // deactivate and remove
216                removed.deactivate(false, 0l);
217                consumers.remove(removed);
218            }
219        }
220    }
221
222    public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
223        // synchronize with dispatch method so that no new messages are sent
224        // while we are recovering a subscription to avoid out of order messages.
225        dispatchLock.writeLock().lock();
226        try {
227
228            if (topicStore == null) {
229                return;
230            }
231
232            // Recover the durable subscription.
233            String clientId = subscription.getSubscriptionKey().getClientId();
234            String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
235            String selector = subscription.getConsumerInfo().getSelector();
236            SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
237            if (info != null) {
238                // Check to see if selector changed.
239                String s1 = info.getSelector();
240                if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
241                    // Need to delete the subscription
242                    topicStore.deleteSubscription(clientId, subscriptionName);
243                    info = null;
244                    synchronized (consumers) {
245                        consumers.remove(subscription);
246                    }
247                } else {
248                    synchronized (consumers) {
249                        if (!consumers.contains(subscription)) {
250                            consumers.add(subscription);
251                        }
252                    }
253                }
254            }
255
256            // Do we need to create the subscription?
257            if (info == null) {
258                info = new SubscriptionInfo();
259                info.setClientId(clientId);
260                info.setSelector(selector);
261                info.setSubscriptionName(subscriptionName);
262                info.setDestination(getActiveMQDestination());
263                // This destination is an actual destination id.
264                info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
265                // This destination might be a pattern
266                synchronized (consumers) {
267                    consumers.add(subscription);
268                    topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive());
269                }
270            }
271
272            final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
273            msgContext.setDestination(destination);
274            if (subscription.isRecoveryRequired()) {
275                topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
276                    @Override
277                    public boolean recoverMessage(Message message) throws Exception {
278                        message.setRegionDestination(Topic.this);
279                        try {
280                            msgContext.setMessageReference(message);
281                            if (subscription.matches(message, msgContext)) {
282                                subscription.add(message);
283                            }
284                        } catch (IOException e) {
285                            LOG.error("Failed to recover this message {}", message, e);
286                        }
287                        return true;
288                    }
289
290                    @Override
291                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
292                        throw new RuntimeException("Should not be called.");
293                    }
294
295                    @Override
296                    public boolean hasSpace() {
297                        return true;
298                    }
299
300                    @Override
301                    public boolean isDuplicate(MessageId id) {
302                        return false;
303                    }
304
305
306                    @Override
307                    public boolean canRecoveryNextMessage() {
308                        return true;
309                    }
310                });
311            }
312        } finally {
313            dispatchLock.writeLock().unlock();
314        }
315    }
316
317    public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception {
318        synchronized (consumers) {
319            consumers.remove(sub);
320        }
321        sub.remove(context, this, dispatched);
322    }
323
324    public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
325        if (subscription.getConsumerInfo().isRetroactive()) {
326            subscriptionRecoveryPolicy.recover(context, this, subscription);
327        }
328    }
329
330    @Override
331    public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
332        final ConnectionContext context = producerExchange.getConnectionContext();
333
334        final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
335        producerExchange.incrementSend();
336        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
337                && !context.isInRecoveryMode();
338
339        message.setRegionDestination(this);
340
341        // There is delay between the client sending it and it arriving at the
342        // destination.. it may have expired.
343        if (message.isExpired()) {
344            broker.messageExpired(context, message, null);
345            getDestinationStatistics().getExpired().increment();
346            if (sendProducerAck) {
347                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
348                context.getConnection().dispatchAsync(ack);
349            }
350            return;
351        }
352
353        if (memoryUsage.isFull()) {
354            isFull(context, memoryUsage);
355            fastProducer(context, producerInfo);
356
357            if (isProducerFlowControl() && context.isProducerFlowControl()) {
358
359                if (isFlowControlLogRequired()) {
360                    LOG.warn("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
361                            getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
362                } else {
363                    LOG.debug("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
364                            getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
365                }
366
367                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
368                    throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
369                            + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
370                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
371                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
372                }
373
374                // We can avoid blocking due to low usage if the producer is sending a sync message or
375                // if it is using a producer window
376                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
377                    synchronized (messagesWaitingForSpace) {
378                        messagesWaitingForSpace.add(new Runnable() {
379                            @Override
380                            public void run() {
381                                try {
382
383                                    // While waiting for space to free up...
384                                    // the transaction may be done
385                                    if (message.isInTransaction()) {
386                                        if (context.getTransaction() == null || context.getTransaction().getState() > IN_USE_STATE) {
387                                            throw new JMSException("Send transaction completed while waiting for space");
388                                        }
389                                    }
390
391                                    // the message may have expired.
392                                    if (message.isExpired()) {
393                                        broker.messageExpired(context, message, null);
394                                        getDestinationStatistics().getExpired().increment();
395                                    } else {
396                                        doMessageSend(producerExchange, message);
397                                    }
398
399                                    if (sendProducerAck) {
400                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
401                                                .getSize());
402                                        context.getConnection().dispatchAsync(ack);
403                                    } else {
404                                        Response response = new Response();
405                                        response.setCorrelationId(message.getCommandId());
406                                        context.getConnection().dispatchAsync(response);
407                                    }
408
409                                } catch (Exception e) {
410                                    if (!sendProducerAck && !context.isInRecoveryMode()) {
411                                        ExceptionResponse response = new ExceptionResponse(e);
412                                        response.setCorrelationId(message.getCommandId());
413                                        context.getConnection().dispatchAsync(response);
414                                    }
415                                }
416                            }
417                        });
418
419                        registerCallbackForNotFullNotification();
420                        context.setDontSendReponse(true);
421                        return;
422                    }
423
424                } else {
425                    // Producer flow control cannot be used, so we have do the flow control
426                    // at the broker by blocking this thread until there is space available.
427
428                    if (memoryUsage.isFull()) {
429                        if (context.isInTransaction()) {
430
431                            int count = 0;
432                            while (!memoryUsage.waitForSpace(1000)) {
433                                if (context.getStopping().get()) {
434                                    throw new IOException("Connection closed, send aborted.");
435                                }
436                                if (count > 2 && context.isInTransaction()) {
437                                    count = 0;
438                                    int size = context.getTransaction().size();
439                                    LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message);
440                                }
441                                count++;
442                            }
443                        } else {
444                            waitForSpace(
445                                    context,
446                                    producerExchange,
447                                    memoryUsage,
448                                    "Usage Manager Memory Usage limit reached. Stopping producer ("
449                                            + message.getProducerId()
450                                            + ") to prevent flooding "
451                                            + getActiveMQDestination().getQualifiedName()
452                                            + "."
453                                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
454                        }
455                    }
456
457                    // The usage manager could have delayed us by the time
458                    // we unblock the message could have expired..
459                    if (message.isExpired()) {
460                        getDestinationStatistics().getExpired().increment();
461                        LOG.debug("Expired message: {}", message);
462                        return;
463                    }
464                }
465            }
466        }
467
468        doMessageSend(producerExchange, message);
469        messageDelivered(context, message);
470        if (sendProducerAck) {
471            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
472            context.getConnection().dispatchAsync(ack);
473        }
474    }
475
476    /**
477     * do send the message - this needs to be synchronized to ensure messages
478     * are stored AND dispatched in the right order
479     *
480     * @param producerExchange
481     * @param message
482     * @throws IOException
483     * @throws Exception
484     */
485    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
486            throws IOException, Exception {
487        final ConnectionContext context = producerExchange.getConnectionContext();
488        message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
489        Future<Object> result = null;
490
491        if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
492            if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
493                final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
494                        + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
495                        + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
496                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
497                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
498                    throw new javax.jms.ResourceAllocationException(logMessage);
499                }
500
501                waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
502            }
503            result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
504        }
505
506        message.incrementReferenceCount();
507
508        if (context.isInTransaction()) {
509            context.getTransaction().addSynchronization(new Synchronization() {
510                @Override
511                public void afterCommit() throws Exception {
512                    // It could take while before we receive the commit
513                    // operation.. by that time the message could have
514                    // expired..
515                    if (message.isExpired()) {
516                        if (broker.isExpired(message)) {
517                            getDestinationStatistics().getExpired().increment();
518                            broker.messageExpired(context, message, null);
519                        }
520                        message.decrementReferenceCount();
521                        return;
522                    }
523                    try {
524                        dispatch(context, message);
525                    } finally {
526                        message.decrementReferenceCount();
527                    }
528                }
529
530                @Override
531                public void afterRollback() throws Exception {
532                    message.decrementReferenceCount();
533                }
534            });
535
536        } else {
537            try {
538                dispatch(context, message);
539            } finally {
540                message.decrementReferenceCount();
541            }
542        }
543
544        if (result != null && !result.isCancelled()) {
545            try {
546                result.get();
547            } catch (CancellationException e) {
548                // ignore - the task has been cancelled if the message
549                // has already been deleted
550            }
551        }
552    }
553
554    private boolean canOptimizeOutPersistence() {
555        return durableSubscribers.size() == 0;
556    }
557
558    @Override
559    public String toString() {
560        return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
561    }
562
563    @Override
564    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
565            final MessageReference node) throws IOException {
566        if (topicStore != null && node.isPersistent()) {
567            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
568            SubscriptionKey key = dsub.getSubscriptionKey();
569            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
570                    convertToNonRangedAck(ack, node));
571        }
572        messageConsumed(context, node);
573    }
574
575    @Override
576    public void gc() {
577    }
578
579    public Message loadMessage(MessageId messageId) throws IOException {
580        return topicStore != null ? topicStore.getMessage(messageId) : null;
581    }
582
583    @Override
584    public void start() throws Exception {
585        if (started.compareAndSet(false, true)) {
586            this.subscriptionRecoveryPolicy.start();
587            if (memoryUsage != null) {
588                memoryUsage.start();
589            }
590
591            if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) {
592                scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
593            }
594        }
595    }
596
597    @Override
598    public void stop() throws Exception {
599        if (started.compareAndSet(true, false)) {
600            if (taskRunner != null) {
601                taskRunner.shutdown();
602            }
603            this.subscriptionRecoveryPolicy.stop();
604            if (memoryUsage != null) {
605                memoryUsage.stop();
606            }
607            if (this.topicStore != null) {
608                this.topicStore.stop();
609            }
610
611            scheduler.cancel(expireMessagesTask);
612        }
613    }
614
615    @Override
616    public Message[] browse() {
617        final List<Message> result = new ArrayList<Message>();
618        doBrowse(result, getMaxBrowsePageSize());
619        return result.toArray(new Message[result.size()]);
620    }
621
622    private void doBrowse(final List<Message> browseList, final int max) {
623        try {
624            if (topicStore != null) {
625                final List<Message> toExpire = new ArrayList<Message>();
626                topicStore.recover(new MessageRecoveryListener() {
627                    @Override
628                    public boolean recoverMessage(Message message) throws Exception {
629                        if (message.isExpired()) {
630                            toExpire.add(message);
631                        }
632                        browseList.add(message);
633                        return true;
634                    }
635
636                    @Override
637                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
638                        return true;
639                    }
640
641                    @Override
642                    public boolean hasSpace() {
643                        return browseList.size() < max;
644                    }
645
646                    @Override
647                    public boolean isDuplicate(MessageId id) {
648                        return false;
649                    }
650
651                    @Override
652                    public boolean canRecoveryNextMessage() {
653                        return true;
654                    }
655                });
656                final ConnectionContext connectionContext = createConnectionContext();
657                for (Message message : toExpire) {
658                    for (DurableTopicSubscription sub : durableSubscribers.values()) {
659                        if (!sub.isActive()) {
660                            message.setRegionDestination(this);
661                            messageExpired(connectionContext, sub, message);
662                        }
663                    }
664                }
665                Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
666                if (msgs != null) {
667                    for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
668                        browseList.add(msgs[i]);
669                    }
670                }
671            }
672        } catch (Throwable e) {
673            LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e);
674        }
675    }
676
677    @Override
678    public boolean iterate() {
679        synchronized (messagesWaitingForSpace) {
680            while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
681                Runnable op = messagesWaitingForSpace.removeFirst();
682                op.run();
683            }
684
685            if (!messagesWaitingForSpace.isEmpty()) {
686                registerCallbackForNotFullNotification();
687            }
688        }
689        return false;
690    }
691
692    private void registerCallbackForNotFullNotification() {
693        // If the usage manager is not full, then the task will not
694        // get called..
695        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
696            // so call it directly here.
697            sendMessagesWaitingForSpaceTask.run();
698        }
699    }
700
701    // Properties
702    // -------------------------------------------------------------------------
703
704    public DispatchPolicy getDispatchPolicy() {
705        return dispatchPolicy;
706    }
707
708    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
709        this.dispatchPolicy = dispatchPolicy;
710    }
711
712    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
713        return subscriptionRecoveryPolicy;
714    }
715
716    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) {
717        if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) {
718            // allow users to combine retained message policy with other ActiveMQ policies
719            RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy;
720            policy.setWrapped(recoveryPolicy);
721        } else {
722            this.subscriptionRecoveryPolicy = recoveryPolicy;
723        }
724    }
725
726    // Implementation methods
727    // -------------------------------------------------------------------------
728
729    @Override
730    public final void wakeup() {
731    }
732
733    protected void dispatch(final ConnectionContext context, Message message) throws Exception {
734        // AMQ-2586: Better to leave this stat at zero than to give the user
735        // misleading metrics.
736        // destinationStatistics.getMessages().increment();
737        destinationStatistics.getEnqueues().increment();
738        destinationStatistics.getMessageSize().addSize(message.getSize());
739        MessageEvaluationContext msgContext = null;
740
741        dispatchLock.readLock().lock();
742        try {
743            if (!subscriptionRecoveryPolicy.add(context, message)) {
744                return;
745            }
746            synchronized (consumers) {
747                if (consumers.isEmpty()) {
748                    onMessageWithNoConsumers(context, message);
749                    return;
750                }
751            }
752            msgContext = context.getMessageEvaluationContext();
753            msgContext.setDestination(destination);
754            msgContext.setMessageReference(message);
755            if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
756                onMessageWithNoConsumers(context, message);
757            }
758
759        } finally {
760            dispatchLock.readLock().unlock();
761            if (msgContext != null) {
762                msgContext.clear();
763            }
764        }
765    }
766
767    private final AtomicBoolean expiryTaskInProgress = new AtomicBoolean(false);
768    private final Runnable expireMessagesWork = new Runnable() {
769        @Override
770        public void run() {
771            List<Message> browsedMessages = new InsertionCountList<Message>();
772            doBrowse(browsedMessages, getMaxExpirePageSize());
773            expiryTaskInProgress.set(false);
774        }
775    };
776    private final Runnable expireMessagesTask = new Runnable() {
777        @Override
778        public void run() {
779            if (expiryTaskInProgress.compareAndSet(false, true)) {
780                taskRunnerFactor.execute(expireMessagesWork);
781            }
782        }
783    };
784
785    @Override
786    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
787        broker.messageExpired(context, reference, subs);
788        // AMQ-2586: Better to leave this stat at zero than to give the user
789        // misleading metrics.
790        // destinationStatistics.getMessages().decrement();
791        destinationStatistics.getExpired().increment();
792        MessageAck ack = new MessageAck();
793        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
794        ack.setDestination(destination);
795        ack.setMessageID(reference.getMessageId());
796        try {
797            if (subs instanceof DurableTopicSubscription) {
798                ((DurableTopicSubscription)subs).removePending(reference);
799            }
800            acknowledge(context, subs, ack, reference);
801        } catch (Exception e) {
802            LOG.error("Failed to remove expired Message from the store ", e);
803        }
804    }
805
806    @Override
807    protected Logger getLog() {
808        return LOG;
809    }
810
811    protected boolean isOptimizeStorage(){
812        boolean result = false;
813
814        if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){
815                result = true;
816                for (DurableTopicSubscription s : durableSubscribers.values()) {
817                    if (s.isActive()== false){
818                        result = false;
819                        break;
820                    }
821                    if (s.getPrefetchSize()==0){
822                        result = false;
823                        break;
824                    }
825                    if (s.isSlowConsumer()){
826                        result = false;
827                        break;
828                    }
829                    if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
830                        result = false;
831                        break;
832                    }
833                }
834        }
835        return result;
836    }
837
838    /**
839     * force a reread of the store - after transaction recovery completion
840     * @param pendingAdditionsCount
841     */
842    @Override
843    public void clearPendingMessages(int pendingAdditionsCount) {
844        dispatchLock.readLock().lock();
845        try {
846            for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
847                clearPendingAndDispatch(durableTopicSubscription);
848            }
849        } finally {
850            dispatchLock.readLock().unlock();
851        }
852    }
853
854    private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
855        synchronized (durableTopicSubscription.pendingLock) {
856            durableTopicSubscription.pending.clear();
857            try {
858                durableTopicSubscription.dispatchPending();
859            } catch (IOException exception) {
860                LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{
861                        durableTopicSubscription,
862                        destination,
863                        durableTopicSubscription.pending }, exception);
864            }
865        }
866    }
867
868    private void rollback(MessageId poisoned) {
869        dispatchLock.readLock().lock();
870        try {
871            for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
872                durableTopicSubscription.getPending().rollback(poisoned);
873            }
874        } finally {
875            dispatchLock.readLock().unlock();
876        }
877    }
878
879    public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
880        return durableSubscribers;
881    }
882}