001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.CancellationException;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.CopyOnWriteArrayList;
028import java.util.concurrent.Future;
029import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031import org.apache.activemq.advisory.AdvisorySupport;
032import org.apache.activemq.broker.BrokerService;
033import org.apache.activemq.broker.ConnectionContext;
034import org.apache.activemq.broker.ProducerBrokerExchange;
035import org.apache.activemq.broker.region.policy.DispatchPolicy;
036import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
037import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
038import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
039import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
040import org.apache.activemq.broker.util.InsertionCountList;
041import org.apache.activemq.command.ActiveMQDestination;
042import org.apache.activemq.command.ExceptionResponse;
043import org.apache.activemq.command.Message;
044import org.apache.activemq.command.MessageAck;
045import org.apache.activemq.command.MessageId;
046import org.apache.activemq.command.ProducerAck;
047import org.apache.activemq.command.ProducerInfo;
048import org.apache.activemq.command.Response;
049import org.apache.activemq.command.SubscriptionInfo;
050import org.apache.activemq.filter.MessageEvaluationContext;
051import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
052import org.apache.activemq.store.MessageRecoveryListener;
053import org.apache.activemq.store.TopicMessageStore;
054import org.apache.activemq.thread.Task;
055import org.apache.activemq.thread.TaskRunner;
056import org.apache.activemq.thread.TaskRunnerFactory;
057import org.apache.activemq.transaction.Synchronization;
058import org.apache.activemq.util.SubscriptionKey;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * The Topic is a destination that sends a copy of a message to every active
064 * Subscription registered.
065 */
066public class Topic extends BaseDestination implements Task {
067    protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
068    private final TopicMessageStore topicStore;
069    protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
070    private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
071    private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
072    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
073    private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
074    private final TaskRunner taskRunner;
075    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
076    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
077        @Override
078        public void run() {
079            try {
080                Topic.this.taskRunner.wakeup();
081            } catch (InterruptedException e) {
082            }
083        }
084    };
085
086    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
087            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
088        super(brokerService, store, destination, parentStats);
089        this.topicStore = store;
090        subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
091        this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
092    }
093
094    @Override
095    public void initialize() throws Exception {
096        super.initialize();
097        // set non default subscription recovery policy (override policyEntries)
098        if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
099            subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
100            setAlwaysRetroactive(true);
101        }
102        if (store != null) {
103            // AMQ-2586: Better to leave this stat at zero than to give the user
104            // misleading metrics.
105            // int messageCount = store.getMessageCount();
106            // destinationStatistics.getMessages().setCount(messageCount);
107        }
108    }
109
110    @Override
111    public List<Subscription> getConsumers() {
112        synchronized (consumers) {
113            return new ArrayList<Subscription>(consumers);
114        }
115    }
116
117    public boolean lock(MessageReference node, LockOwner sub) {
118        return true;
119    }
120
121    @Override
122    public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
123        if (!sub.getConsumerInfo().isDurable()) {
124
125            // Do a retroactive recovery if needed.
126            if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
127
128                // synchronize with dispatch method so that no new messages are sent
129                // while we are recovering a subscription to avoid out of order messages.
130                dispatchLock.writeLock().lock();
131                try {
132                    boolean applyRecovery = false;
133                    synchronized (consumers) {
134                        if (!consumers.contains(sub)){
135                            sub.add(context, this);
136                            consumers.add(sub);
137                            applyRecovery=true;
138                            super.addSubscription(context, sub);
139                        }
140                    }
141                    if (applyRecovery){
142                        subscriptionRecoveryPolicy.recover(context, this, sub);
143                    }
144                } finally {
145                    dispatchLock.writeLock().unlock();
146                }
147
148            } else {
149                synchronized (consumers) {
150                    if (!consumers.contains(sub)){
151                        sub.add(context, this);
152                        consumers.add(sub);
153                        super.addSubscription(context, sub);
154                    }
155                }
156            }
157        } else {
158            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
159            super.addSubscription(context, sub);
160            sub.add(context, this);
161            if(dsub.isActive()) {
162                synchronized (consumers) {
163                    boolean hasSubscription = false;
164
165                    if (consumers.size() == 0) {
166                        hasSubscription = false;
167                    } else {
168                        for (Subscription currentSub : consumers) {
169                            if (currentSub.getConsumerInfo().isDurable()) {
170                                DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
171                                if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
172                                    hasSubscription = true;
173                                    break;
174                                }
175                            }
176                        }
177                    }
178
179                    if (!hasSubscription) {
180                        consumers.add(sub);
181                    }
182                }
183            }
184            durableSubscribers.put(dsub.getSubscriptionKey(), dsub);
185        }
186    }
187
188    @Override
189    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
190        if (!sub.getConsumerInfo().isDurable()) {
191            boolean removed = false;
192            synchronized (consumers) {
193                removed = consumers.remove(sub);
194            }
195            if (removed) {
196                super.removeSubscription(context, sub, lastDeliveredSequenceId);
197            }
198        }
199        sub.remove(context, this);
200    }
201
202    public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
203        if (topicStore != null) {
204            topicStore.deleteSubscription(key.clientId, key.subscriptionName);
205            DurableTopicSubscription removed = durableSubscribers.remove(key);
206            if (removed != null) {
207                destinationStatistics.getConsumers().decrement();
208                // deactivate and remove
209                removed.deactivate(false, 0l);
210                consumers.remove(removed);
211            }
212        }
213    }
214
215    public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
216        // synchronize with dispatch method so that no new messages are sent
217        // while we are recovering a subscription to avoid out of order messages.
218        dispatchLock.writeLock().lock();
219        try {
220
221            if (topicStore == null) {
222                return;
223            }
224
225            // Recover the durable subscription.
226            String clientId = subscription.getSubscriptionKey().getClientId();
227            String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
228            String selector = subscription.getConsumerInfo().getSelector();
229            SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
230            if (info != null) {
231                // Check to see if selector changed.
232                String s1 = info.getSelector();
233                if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
234                    // Need to delete the subscription
235                    topicStore.deleteSubscription(clientId, subscriptionName);
236                    info = null;
237                    synchronized (consumers) {
238                        consumers.remove(subscription);
239                    }
240                } else {
241                    synchronized (consumers) {
242                        if (!consumers.contains(subscription)) {
243                            consumers.add(subscription);
244                        }
245                    }
246                }
247            }
248
249            // Do we need to create the subscription?
250            if (info == null) {
251                info = new SubscriptionInfo();
252                info.setClientId(clientId);
253                info.setSelector(selector);
254                info.setSubscriptionName(subscriptionName);
255                info.setDestination(getActiveMQDestination());
256                // This destination is an actual destination id.
257                info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
258                // This destination might be a pattern
259                synchronized (consumers) {
260                    consumers.add(subscription);
261                    topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive());
262                }
263            }
264
265            final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
266            msgContext.setDestination(destination);
267            if (subscription.isRecoveryRequired()) {
268                topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
269                    @Override
270                    public boolean recoverMessage(Message message) throws Exception {
271                        message.setRegionDestination(Topic.this);
272                        try {
273                            msgContext.setMessageReference(message);
274                            if (subscription.matches(message, msgContext)) {
275                                subscription.add(message);
276                            }
277                        } catch (IOException e) {
278                            LOG.error("Failed to recover this message {}", message, e);
279                        }
280                        return true;
281                    }
282
283                    @Override
284                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
285                        throw new RuntimeException("Should not be called.");
286                    }
287
288                    @Override
289                    public boolean hasSpace() {
290                        return true;
291                    }
292
293                    @Override
294                    public boolean isDuplicate(MessageId id) {
295                        return false;
296                    }
297                });
298            }
299        } finally {
300            dispatchLock.writeLock().unlock();
301        }
302    }
303
304    public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception {
305        synchronized (consumers) {
306            consumers.remove(sub);
307        }
308        sub.remove(context, this, dispatched);
309    }
310
311    public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
312        if (subscription.getConsumerInfo().isRetroactive()) {
313            subscriptionRecoveryPolicy.recover(context, this, subscription);
314        }
315    }
316
317    @Override
318    public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
319        final ConnectionContext context = producerExchange.getConnectionContext();
320
321        final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
322        producerExchange.incrementSend();
323        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
324                && !context.isInRecoveryMode();
325
326        message.setRegionDestination(this);
327
328        // There is delay between the client sending it and it arriving at the
329        // destination.. it may have expired.
330        if (message.isExpired()) {
331            broker.messageExpired(context, message, null);
332            getDestinationStatistics().getExpired().increment();
333            if (sendProducerAck) {
334                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
335                context.getConnection().dispatchAsync(ack);
336            }
337            return;
338        }
339
340        if (memoryUsage.isFull()) {
341            isFull(context, memoryUsage);
342            fastProducer(context, producerInfo);
343
344            if (isProducerFlowControl() && context.isProducerFlowControl()) {
345
346                if (isFlowControlLogRequired()) {
347                    LOG.warn("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
348                            getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
349                } else {
350                    LOG.debug("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
351                            getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
352                }
353
354                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
355                    throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
356                            + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
357                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
358                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
359                }
360
361                // We can avoid blocking due to low usage if the producer is sending a sync message or
362                // if it is using a producer window
363                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
364                    synchronized (messagesWaitingForSpace) {
365                        messagesWaitingForSpace.add(new Runnable() {
366                            @Override
367                            public void run() {
368                                try {
369
370                                    // While waiting for space to free up... the
371                                    // message may have expired.
372                                    if (message.isExpired()) {
373                                        broker.messageExpired(context, message, null);
374                                        getDestinationStatistics().getExpired().increment();
375                                    } else {
376                                        doMessageSend(producerExchange, message);
377                                    }
378
379                                    if (sendProducerAck) {
380                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
381                                                .getSize());
382                                        context.getConnection().dispatchAsync(ack);
383                                    } else {
384                                        Response response = new Response();
385                                        response.setCorrelationId(message.getCommandId());
386                                        context.getConnection().dispatchAsync(response);
387                                    }
388
389                                } catch (Exception e) {
390                                    if (!sendProducerAck && !context.isInRecoveryMode()) {
391                                        ExceptionResponse response = new ExceptionResponse(e);
392                                        response.setCorrelationId(message.getCommandId());
393                                        context.getConnection().dispatchAsync(response);
394                                    }
395                                }
396                            }
397                        });
398
399                        registerCallbackForNotFullNotification();
400                        context.setDontSendReponse(true);
401                        return;
402                    }
403
404                } else {
405                    // Producer flow control cannot be used, so we have do the flow control
406                    // at the broker by blocking this thread until there is space available.
407
408                    if (memoryUsage.isFull()) {
409                        if (context.isInTransaction()) {
410
411                            int count = 0;
412                            while (!memoryUsage.waitForSpace(1000)) {
413                                if (context.getStopping().get()) {
414                                    throw new IOException("Connection closed, send aborted.");
415                                }
416                                if (count > 2 && context.isInTransaction()) {
417                                    count = 0;
418                                    int size = context.getTransaction().size();
419                                    LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message);
420                                }
421                                count++;
422                            }
423                        } else {
424                            waitForSpace(
425                                    context,
426                                    producerExchange,
427                                    memoryUsage,
428                                    "Usage Manager Memory Usage limit reached. Stopping producer ("
429                                            + message.getProducerId()
430                                            + ") to prevent flooding "
431                                            + getActiveMQDestination().getQualifiedName()
432                                            + "."
433                                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
434                        }
435                    }
436
437                    // The usage manager could have delayed us by the time
438                    // we unblock the message could have expired..
439                    if (message.isExpired()) {
440                        getDestinationStatistics().getExpired().increment();
441                        LOG.debug("Expired message: {}", message);
442                        return;
443                    }
444                }
445            }
446        }
447
448        doMessageSend(producerExchange, message);
449        messageDelivered(context, message);
450        if (sendProducerAck) {
451            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
452            context.getConnection().dispatchAsync(ack);
453        }
454    }
455
456    /**
457     * do send the message - this needs to be synchronized to ensure messages
458     * are stored AND dispatched in the right order
459     *
460     * @param producerExchange
461     * @param message
462     * @throws IOException
463     * @throws Exception
464     */
465    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
466            throws IOException, Exception {
467        final ConnectionContext context = producerExchange.getConnectionContext();
468        message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
469        Future<Object> result = null;
470
471        if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
472            if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
473                final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
474                        + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
475                        + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
476                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
477                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
478                    throw new javax.jms.ResourceAllocationException(logMessage);
479                }
480
481                waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
482            }
483            result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
484        }
485
486        message.incrementReferenceCount();
487
488        if (context.isInTransaction()) {
489            context.getTransaction().addSynchronization(new Synchronization() {
490                @Override
491                public void afterCommit() throws Exception {
492                    // It could take while before we receive the commit
493                    // operation.. by that time the message could have
494                    // expired..
495                    if (message.isExpired()) {
496                        if (broker.isExpired(message)) {
497                            getDestinationStatistics().getExpired().increment();
498                            broker.messageExpired(context, message, null);
499                        }
500                        message.decrementReferenceCount();
501                        return;
502                    }
503                    try {
504                        dispatch(context, message);
505                    } finally {
506                        message.decrementReferenceCount();
507                    }
508                }
509
510                @Override
511                public void afterRollback() throws Exception {
512                    message.decrementReferenceCount();
513                }
514            });
515
516        } else {
517            try {
518                dispatch(context, message);
519            } finally {
520                message.decrementReferenceCount();
521            }
522        }
523
524        if (result != null && !result.isCancelled()) {
525            try {
526                result.get();
527            } catch (CancellationException e) {
528                // ignore - the task has been cancelled if the message
529                // has already been deleted
530            }
531        }
532    }
533
534    private boolean canOptimizeOutPersistence() {
535        return durableSubscribers.size() == 0;
536    }
537
538    @Override
539    public String toString() {
540        return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
541    }
542
543    @Override
544    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
545            final MessageReference node) throws IOException {
546        if (topicStore != null && node.isPersistent()) {
547            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
548            SubscriptionKey key = dsub.getSubscriptionKey();
549            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
550                    convertToNonRangedAck(ack, node));
551        }
552        messageConsumed(context, node);
553    }
554
555    @Override
556    public void gc() {
557    }
558
559    public Message loadMessage(MessageId messageId) throws IOException {
560        return topicStore != null ? topicStore.getMessage(messageId) : null;
561    }
562
563    @Override
564    public void start() throws Exception {
565        if (started.compareAndSet(false, true)) {
566            this.subscriptionRecoveryPolicy.start();
567            if (memoryUsage != null) {
568                memoryUsage.start();
569            }
570
571            if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) {
572                scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
573            }
574        }
575    }
576
577    @Override
578    public void stop() throws Exception {
579        if (started.compareAndSet(true, false)) {
580            if (taskRunner != null) {
581                taskRunner.shutdown();
582            }
583            this.subscriptionRecoveryPolicy.stop();
584            if (memoryUsage != null) {
585                memoryUsage.stop();
586            }
587            if (this.topicStore != null) {
588                this.topicStore.stop();
589            }
590
591            scheduler.cancel(expireMessagesTask);
592        }
593    }
594
595    @Override
596    public Message[] browse() {
597        final List<Message> result = new ArrayList<Message>();
598        doBrowse(result, getMaxBrowsePageSize());
599        return result.toArray(new Message[result.size()]);
600    }
601
602    private void doBrowse(final List<Message> browseList, final int max) {
603        try {
604            if (topicStore != null) {
605                final List<Message> toExpire = new ArrayList<Message>();
606                topicStore.recover(new MessageRecoveryListener() {
607                    @Override
608                    public boolean recoverMessage(Message message) throws Exception {
609                        if (message.isExpired()) {
610                            toExpire.add(message);
611                        }
612                        browseList.add(message);
613                        return true;
614                    }
615
616                    @Override
617                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
618                        return true;
619                    }
620
621                    @Override
622                    public boolean hasSpace() {
623                        return browseList.size() < max;
624                    }
625
626                    @Override
627                    public boolean isDuplicate(MessageId id) {
628                        return false;
629                    }
630                });
631                final ConnectionContext connectionContext = createConnectionContext();
632                for (Message message : toExpire) {
633                    for (DurableTopicSubscription sub : durableSubscribers.values()) {
634                        if (!sub.isActive()) {
635                            message.setRegionDestination(this);
636                            messageExpired(connectionContext, sub, message);
637                        }
638                    }
639                }
640                Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
641                if (msgs != null) {
642                    for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
643                        browseList.add(msgs[i]);
644                    }
645                }
646            }
647        } catch (Throwable e) {
648            LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e);
649        }
650    }
651
652    @Override
653    public boolean iterate() {
654        synchronized (messagesWaitingForSpace) {
655            while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
656                Runnable op = messagesWaitingForSpace.removeFirst();
657                op.run();
658            }
659
660            if (!messagesWaitingForSpace.isEmpty()) {
661                registerCallbackForNotFullNotification();
662            }
663        }
664        return false;
665    }
666
667    private void registerCallbackForNotFullNotification() {
668        // If the usage manager is not full, then the task will not
669        // get called..
670        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
671            // so call it directly here.
672            sendMessagesWaitingForSpaceTask.run();
673        }
674    }
675
676    // Properties
677    // -------------------------------------------------------------------------
678
679    public DispatchPolicy getDispatchPolicy() {
680        return dispatchPolicy;
681    }
682
683    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
684        this.dispatchPolicy = dispatchPolicy;
685    }
686
687    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
688        return subscriptionRecoveryPolicy;
689    }
690
691    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) {
692        if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) {
693            // allow users to combine retained message policy with other ActiveMQ policies
694            RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy;
695            policy.setWrapped(recoveryPolicy);
696        } else {
697            this.subscriptionRecoveryPolicy = recoveryPolicy;
698        }
699    }
700
701    // Implementation methods
702    // -------------------------------------------------------------------------
703
704    @Override
705    public final void wakeup() {
706    }
707
708    protected void dispatch(final ConnectionContext context, Message message) throws Exception {
709        // AMQ-2586: Better to leave this stat at zero than to give the user
710        // misleading metrics.
711        // destinationStatistics.getMessages().increment();
712        destinationStatistics.getEnqueues().increment();
713        destinationStatistics.getMessageSize().addSize(message.getSize());
714        MessageEvaluationContext msgContext = null;
715
716        dispatchLock.readLock().lock();
717        try {
718            if (!subscriptionRecoveryPolicy.add(context, message)) {
719                return;
720            }
721            synchronized (consumers) {
722                if (consumers.isEmpty()) {
723                    onMessageWithNoConsumers(context, message);
724                    return;
725                }
726            }
727            msgContext = context.getMessageEvaluationContext();
728            msgContext.setDestination(destination);
729            msgContext.setMessageReference(message);
730            if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
731                onMessageWithNoConsumers(context, message);
732            }
733
734        } finally {
735            dispatchLock.readLock().unlock();
736            if (msgContext != null) {
737                msgContext.clear();
738            }
739        }
740    }
741
742    private final Runnable expireMessagesTask = new Runnable() {
743        @Override
744        public void run() {
745            List<Message> browsedMessages = new InsertionCountList<Message>();
746            doBrowse(browsedMessages, getMaxExpirePageSize());
747        }
748    };
749
750    @Override
751    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
752        broker.messageExpired(context, reference, subs);
753        // AMQ-2586: Better to leave this stat at zero than to give the user
754        // misleading metrics.
755        // destinationStatistics.getMessages().decrement();
756        destinationStatistics.getExpired().increment();
757        MessageAck ack = new MessageAck();
758        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
759        ack.setDestination(destination);
760        ack.setMessageID(reference.getMessageId());
761        try {
762            if (subs instanceof DurableTopicSubscription) {
763                ((DurableTopicSubscription)subs).removePending(reference);
764            }
765            acknowledge(context, subs, ack, reference);
766        } catch (Exception e) {
767            LOG.error("Failed to remove expired Message from the store ", e);
768        }
769    }
770
771    @Override
772    protected Logger getLog() {
773        return LOG;
774    }
775
776    protected boolean isOptimizeStorage(){
777        boolean result = false;
778
779        if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){
780                result = true;
781                for (DurableTopicSubscription s : durableSubscribers.values()) {
782                    if (s.isActive()== false){
783                        result = false;
784                        break;
785                    }
786                    if (s.getPrefetchSize()==0){
787                        result = false;
788                        break;
789                    }
790                    if (s.isSlowConsumer()){
791                        result = false;
792                        break;
793                    }
794                    if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
795                        result = false;
796                        break;
797                    }
798                }
799        }
800        return result;
801    }
802
803    /**
804     * force a reread of the store - after transaction recovery completion
805     * @param pendingAdditionsCount
806     */
807    @Override
808    public void clearPendingMessages(int pendingAdditionsCount) {
809        dispatchLock.readLock().lock();
810        try {
811            for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
812                clearPendingAndDispatch(durableTopicSubscription);
813            }
814        } finally {
815            dispatchLock.readLock().unlock();
816        }
817    }
818
819    private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
820        synchronized (durableTopicSubscription.pendingLock) {
821            durableTopicSubscription.pending.clear();
822            try {
823                durableTopicSubscription.dispatchPending();
824            } catch (IOException exception) {
825                LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{
826                        durableTopicSubscription,
827                        destination,
828                        durableTopicSubscription.pending }, exception);
829            }
830        }
831    }
832
833    private void rollback(MessageId poisoned) {
834        dispatchLock.readLock().lock();
835        try {
836            for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
837                durableTopicSubscription.getPending().rollback(poisoned);
838            }
839        } finally {
840            dispatchLock.readLock().unlock();
841        }
842    }
843
844    public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
845        return durableSubscribers;
846    }
847}