001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.virtual; 018 019import org.apache.activemq.broker.Broker; 020import org.apache.activemq.broker.region.Destination; 021import org.apache.activemq.broker.region.Subscription; 022import org.apache.activemq.broker.region.Topic; 023import org.apache.activemq.command.Message; 024import org.apache.activemq.filter.BooleanExpression; 025import org.apache.activemq.filter.MessageEvaluationContext; 026import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 027import org.apache.activemq.plugin.SubQueueSelectorCacheBroker; 028import org.apache.activemq.selector.SelectorParser; 029import org.apache.activemq.util.LRUCache; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import java.io.IOException; 034import java.util.List; 035import java.util.Set; 036 037public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor { 038 private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class); 039 LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>(); 040 private final SubQueueSelectorCacheBroker selectorCachePlugin; 041 042 public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) { 043 super(next, virtualTopic); 044 selectorCachePlugin = (SubQueueSelectorCacheBroker) 045 ((Topic)next).createConnectionContext().getBroker().getAdaptor(SubQueueSelectorCacheBroker.class); 046 } 047 048 /** 049 * Respect the selectors of the subscriptions to ensure only matched messages are dispatched to 050 * the virtual queues, hence there is no build up of unmatched messages on these destinations 051 */ 052 @Override 053 protected boolean shouldDispatch(final Broker broker, Message message, Destination dest) throws IOException { 054 boolean matches = false; 055 MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 056 msgContext.setDestination(dest.getActiveMQDestination()); 057 msgContext.setMessageReference(message); 058 List<Subscription> subs = dest.getConsumers(); 059 for (Subscription sub : subs) { 060 if (sub.matches(message, msgContext)) { 061 matches = true; 062 break; 063 } 064 } 065 if (matches == false) { 066 matches = tryMatchingCachedSubs(broker, dest, msgContext); 067 } 068 return matches; 069 } 070 071 private boolean tryMatchingCachedSubs(final Broker broker, Destination dest, MessageEvaluationContext msgContext) { 072 boolean matches = false; 073 LOG.debug("No active consumer match found. Will try cache if configured..."); 074 075 if (selectorCachePlugin != null) { 076 final Set<String> selectors = selectorCachePlugin.getSelector(dest.getActiveMQDestination().getQualifiedName()); 077 if (selectors != null) { 078 for (String selector : selectors) { 079 try { 080 final BooleanExpression expression = getExpression(selector); 081 matches = expression.matches(msgContext); 082 if (matches) { 083 return true; 084 } 085 } catch (Exception e) { 086 LOG.error(e.getMessage(), e); 087 } 088 } 089 } 090 } 091 return matches; 092 } 093 094 private BooleanExpression getExpression(String selector) throws Exception{ 095 BooleanExpression result; 096 synchronized(expressionCache){ 097 result = expressionCache.get(selector); 098 if (result == null){ 099 result = compileSelector(selector); 100 expressionCache.put(selector,result); 101 } 102 } 103 return result; 104 } 105 106 /** 107 * Pre-compile the JMS selector. 108 * 109 * @param selectorExpression The non-null JMS selector expression. 110 */ 111 private BooleanExpression compileSelector(final String selectorExpression) throws Exception { 112 return SelectorParser.parse(selectorExpression); 113 } 114}