001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.plugin; 018 019import java.io.File; 020import java.io.FileInputStream; 021import java.io.FileOutputStream; 022import java.io.IOException; 023import java.io.ObjectInputStream; 024import java.io.ObjectOutputStream; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030 031import javax.management.JMException; 032import javax.management.ObjectName; 033 034import org.apache.activemq.advisory.AdvisorySupport; 035import org.apache.activemq.broker.Broker; 036import org.apache.activemq.broker.BrokerFilter; 037import org.apache.activemq.broker.BrokerService; 038import org.apache.activemq.broker.ConnectionContext; 039import org.apache.activemq.broker.jmx.AnnotatedMBean; 040import org.apache.activemq.broker.jmx.BrokerMBeanSupport; 041import org.apache.activemq.broker.jmx.VirtualDestinationSelectorCacheView; 042import org.apache.activemq.broker.region.Subscription; 043import org.apache.activemq.command.ConsumerInfo; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * A plugin which allows the caching of the selector from a subscription queue. 049 * <p/> 050 * This stops the build-up of unwanted messages, especially when consumers may 051 * disconnect from time to time when using virtual destinations. 052 * <p/> 053 * This is influenced by code snippets developed by Maciej Rakowicz 054 * 055 * @see https://issues.apache.org/activemq/browse/AMQ-3004 056 * @see http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E 057 */ 058public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable { 059 private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class); 060 public static final String MATCH_EVERYTHING = "TRUE"; 061 062 /** 063 * The subscription's selector cache. We cache compiled expressions keyed 064 * by the target destination. 065 */ 066 private ConcurrentMap<String, Set<String>> subSelectorCache = new ConcurrentHashMap<String, Set<String>>(); 067 068 private final File persistFile; 069 private boolean singleSelectorPerDestination = false; 070 private boolean ignoreWildcardSelectors = false; 071 private ObjectName objectName; 072 073 private boolean running = true; 074 private final Thread persistThread; 075 private long persistInterval = MAX_PERSIST_INTERVAL; 076 public static final long MAX_PERSIST_INTERVAL = 600000; 077 private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = "SelectorCachePersistThread"; 078 079 /** 080 * Constructor 081 */ 082 public SubQueueSelectorCacheBroker(Broker next, final File persistFile) { 083 super(next); 084 this.persistFile = persistFile; 085 LOG.info("Using persisted selector cache from[{}]", persistFile); 086 087 readCache(); 088 089 persistThread = new Thread(this, SELECTOR_CACHE_PERSIST_THREAD_NAME); 090 persistThread.start(); 091 enableJmx(); 092 } 093 094 private void enableJmx() { 095 BrokerService broker = getBrokerService(); 096 if (broker.isUseJmx()) { 097 VirtualDestinationSelectorCacheView view = new VirtualDestinationSelectorCacheView(this); 098 try { 099 objectName = BrokerMBeanSupport.createVirtualDestinationSelectorCacheName(broker.getBrokerObjectName(), "plugin", "virtualDestinationCache"); 100 LOG.trace("virtualDestinationCacheSelector mbean name; " + objectName.toString()); 101 AnnotatedMBean.registerMBean(broker.getManagementContext(), view, objectName); 102 } catch (Exception e) { 103 LOG.warn("JMX is enabled, but when installing the VirtualDestinationSelectorCache, couldn't install the JMX mbeans. Continuing without installing the mbeans."); 104 } 105 106 } 107 } 108 109 @Override 110 public void stop() throws Exception { 111 running = false; 112 if (persistThread != null) { 113 persistThread.interrupt(); 114 persistThread.join(); 115 } //if 116 unregisterMBeans(); 117 } 118 119 private void unregisterMBeans() { 120 BrokerService broker = getBrokerService(); 121 if (broker.isUseJmx() && this.objectName != null) { 122 try { 123 broker.getManagementContext().unregisterMBean(objectName); 124 } catch (JMException e) { 125 LOG.warn("Trying uninstall VirtualDestinationSelectorCache; couldn't uninstall mbeans, continuting..."); 126 } 127 } 128 } 129 130 @Override 131 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 132 // don't track selectors for advisory topics 133 if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 134 String destinationName = info.getDestination().getQualifiedName(); 135 LOG.debug("Caching consumer selector [{}] on '{}'", info.getSelector(), destinationName); 136 137 String selector = info.getSelector() == null ? MATCH_EVERYTHING : info.getSelector(); 138 139 if (!(ignoreWildcardSelectors && hasWildcards(selector))) { 140 141 Set<String> selectors = subSelectorCache.get(destinationName); 142 if (selectors == null) { 143 selectors = Collections.synchronizedSet(new HashSet<String>()); 144 } else if (singleSelectorPerDestination && !MATCH_EVERYTHING.equals(selector)) { 145 // in this case, we allow only ONE selector. But we don't count the catch-all "null/TRUE" selector 146 // here, we always allow that one. But only one true selector. 147 boolean containsMatchEverything = selectors.contains(MATCH_EVERYTHING); 148 selectors.clear(); 149 150 // put back the MATCH_EVERYTHING selector 151 if (containsMatchEverything) { 152 selectors.add(MATCH_EVERYTHING); 153 } 154 } 155 156 LOG.debug("adding new selector: into cache " + selector); 157 selectors.add(selector); 158 LOG.debug("current selectors in cache: " + selectors); 159 subSelectorCache.put(destinationName, selectors); 160 } 161 162 163 } 164 return super.addConsumer(context, info); 165 } 166 167 // trivial check for SQL92/selector wildcards 168 private boolean hasWildcards(String selector) { 169 return selector.contains("%") || selector.contains("_"); 170 } 171 172 @Override 173 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 174 if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 175 176 if (singleSelectorPerDestination) { 177 String destinationName = info.getDestination().getQualifiedName(); 178 Set<String> selectors = subSelectorCache.get(destinationName); 179 if (info.getSelector() == null && selectors.size() > 1) { 180 boolean removed = selectors.remove(MATCH_EVERYTHING); 181 LOG.debug("A non-selector consumer has dropped. Removing the catchall matching pattern 'TRUE'. Successful? " + removed); 182 } 183 } 184 185 } 186 super.removeConsumer(context, info); 187 } 188 189 private void readCache() { 190 if (persistFile != null && persistFile.exists()) { 191 try { 192 FileInputStream fis = new FileInputStream(persistFile); 193 try { 194 ObjectInputStream in = new ObjectInputStream(fis); 195 try { 196 subSelectorCache = (ConcurrentHashMap<String, Set<String>>) in.readObject(); 197 } catch (ClassNotFoundException ex) { 198 LOG.error("Invalid selector cache data found. Please remove file.", ex); 199 } finally { 200 in.close(); 201 } //try 202 } finally { 203 fis.close(); 204 } //try 205 } catch (IOException ex) { 206 LOG.error("Unable to read persisted selector cache...it will be ignored!", ex); 207 } //try 208 } //if 209 } 210 211 /** 212 * Persist the selector cache. 213 */ 214 private void persistCache() { 215 LOG.debug("Persisting selector cache...."); 216 try { 217 FileOutputStream fos = new FileOutputStream(persistFile); 218 try { 219 ObjectOutputStream out = new ObjectOutputStream(fos); 220 try { 221 out.writeObject(subSelectorCache); 222 } finally { 223 out.flush(); 224 out.close(); 225 } //try 226 } catch (IOException ex) { 227 LOG.error("Unable to persist selector cache", ex); 228 } finally { 229 fos.close(); 230 } //try 231 } catch (IOException ex) { 232 LOG.error("Unable to access file[{}]", persistFile, ex); 233 } //try 234 } 235 236 /** 237 * @return The JMS selector for the specified {@code destination} 238 */ 239 public Set<String> getSelector(final String destination) { 240 return subSelectorCache.get(destination); 241 } 242 243 /** 244 * Persist the selector cache every {@code MAX_PERSIST_INTERVAL}ms. 245 * 246 * @see java.lang.Runnable#run() 247 */ 248 @Override 249 public void run() { 250 while (running) { 251 try { 252 Thread.sleep(persistInterval); 253 } catch (InterruptedException ex) { 254 } //try 255 256 persistCache(); 257 } 258 } 259 260 public boolean isSingleSelectorPerDestination() { 261 return singleSelectorPerDestination; 262 } 263 264 public void setSingleSelectorPerDestination(boolean singleSelectorPerDestination) { 265 this.singleSelectorPerDestination = singleSelectorPerDestination; 266 } 267 268 public Set<String> getSelectorsForDestination(String destinationName) { 269 if (subSelectorCache.containsKey(destinationName)) { 270 return new HashSet<String>(subSelectorCache.get(destinationName)); 271 } 272 273 return Collections.EMPTY_SET; 274 } 275 276 public long getPersistInterval() { 277 return persistInterval; 278 } 279 280 public void setPersistInterval(long persistInterval) { 281 this.persistInterval = persistInterval; 282 } 283 284 public boolean deleteSelectorForDestination(String destinationName, String selector) { 285 if (subSelectorCache.containsKey(destinationName)) { 286 Set<String> cachedSelectors = subSelectorCache.get(destinationName); 287 return cachedSelectors.remove(selector); 288 } 289 290 return false; 291 } 292 293 public boolean deleteAllSelectorsForDestination(String destinationName) { 294 if (subSelectorCache.containsKey(destinationName)) { 295 Set<String> cachedSelectors = subSelectorCache.get(destinationName); 296 cachedSelectors.clear(); 297 } 298 return true; 299 } 300 301 public boolean isIgnoreWildcardSelectors() { 302 return ignoreWildcardSelectors; 303 } 304 305 public void setIgnoreWildcardSelectors(boolean ignoreWildcardSelectors) { 306 this.ignoreWildcardSelectors = ignoreWildcardSelectors; 307 } 308} 309