001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.virtual;
018
019import org.apache.activemq.broker.Broker;
020import org.apache.activemq.broker.region.Destination;
021import org.apache.activemq.broker.region.Subscription;
022import org.apache.activemq.broker.region.Topic;
023import org.apache.activemq.command.Message;
024import org.apache.activemq.filter.BooleanExpression;
025import org.apache.activemq.filter.MessageEvaluationContext;
026import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
027import org.apache.activemq.plugin.SubQueueSelectorCacheBroker;
028import org.apache.activemq.selector.SelectorParser;
029import org.apache.activemq.util.LRUCache;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import java.io.IOException;
034import java.util.List;
035import java.util.Set;
036
037public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor {
038    private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class);
039    LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>();
040    private final SubQueueSelectorCacheBroker selectorCachePlugin;
041
042    public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
043        super(next, virtualTopic);
044        selectorCachePlugin = (SubQueueSelectorCacheBroker)
045                ((Topic)next).createConnectionContext().getBroker().getAdaptor(SubQueueSelectorCacheBroker.class);
046    }
047
048    /**
049     * Respect the selectors of the subscriptions to ensure only matched messages are dispatched to
050     * the virtual queues, hence there is no build up of unmatched messages on these destinations
051     */
052    @Override
053    protected boolean shouldDispatch(final Broker broker, Message message, Destination dest) throws IOException {
054        boolean matches = false;
055        MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
056        msgContext.setDestination(dest.getActiveMQDestination());
057        msgContext.setMessageReference(message);
058        List<Subscription> subs = dest.getConsumers();
059        for (Subscription sub : subs) {
060            if (sub.matches(message, msgContext)) {
061                matches = true;
062                break;
063            }
064        }
065        if (matches == false) {
066            matches = tryMatchingCachedSubs(broker, dest, msgContext);
067        }
068        return matches;
069    }
070
071    private boolean tryMatchingCachedSubs(final Broker broker, Destination dest, MessageEvaluationContext msgContext) {
072        boolean matches = false;
073        LOG.debug("No active consumer match found. Will try cache if configured...");
074
075        if (selectorCachePlugin != null) {
076            final Set<String> selectors = selectorCachePlugin.getSelector(dest.getActiveMQDestination().getQualifiedName());
077            if (selectors != null) {
078                for (String selector : selectors) {
079                    try {
080                        final BooleanExpression expression = getExpression(selector);
081                        matches = expression.matches(msgContext);
082                        if (matches) {
083                            return true;
084                        }
085                    } catch (Exception e) {
086                        LOG.error(e.getMessage(), e);
087                    }
088                }
089            }
090        }
091        return matches;
092    }
093
094    private BooleanExpression getExpression(String selector) throws Exception{
095        BooleanExpression result;
096        synchronized(expressionCache){
097            result = expressionCache.get(selector);
098            if (result == null){
099                result = compileSelector(selector);
100                expressionCache.put(selector,result);
101            }
102        }
103        return result;
104    }
105
106    /**
107     * Pre-compile the JMS selector.
108     *
109     * @param selectorExpression The non-null JMS selector expression.
110     */
111    private BooleanExpression compileSelector(final String selectorExpression) throws Exception {
112        return SelectorParser.parse(selectorExpression);
113    }
114}