001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.plugin;
018
019import java.io.File;
020import java.io.FileInputStream;
021import java.io.FileOutputStream;
022import java.io.IOException;
023import java.io.ObjectInputStream;
024import java.io.ObjectOutputStream;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030
031import javax.management.JMException;
032import javax.management.ObjectName;
033
034import org.apache.activemq.advisory.AdvisorySupport;
035import org.apache.activemq.broker.Broker;
036import org.apache.activemq.broker.BrokerFilter;
037import org.apache.activemq.broker.BrokerService;
038import org.apache.activemq.broker.ConnectionContext;
039import org.apache.activemq.broker.jmx.AnnotatedMBean;
040import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
041import org.apache.activemq.broker.jmx.VirtualDestinationSelectorCacheView;
042import org.apache.activemq.broker.region.Subscription;
043import org.apache.activemq.command.ConsumerInfo;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * A plugin which allows the caching of the selector from a subscription queue.
049 * <p/>
050 * This stops the build-up of unwanted messages, especially when consumers may
051 * disconnect from time to time when using virtual destinations.
052 * <p/>
053 * This is influenced by code snippets developed by Maciej Rakowicz
054 *
055 * @see https://issues.apache.org/activemq/browse/AMQ-3004
056 * @see http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E
057 */
058public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable {
059    private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class);
060    public static final String MATCH_EVERYTHING = "TRUE";
061
062    /**
063     * The subscription's selector cache. We cache compiled expressions keyed
064     * by the target destination.
065     */
066    private ConcurrentMap<String, Set<String>> subSelectorCache = new ConcurrentHashMap<String, Set<String>>();
067
068    private final File persistFile;
069    private boolean singleSelectorPerDestination = false;
070    private boolean ignoreWildcardSelectors = false;
071    private ObjectName objectName;
072
073    private boolean running = true;
074    private final Thread persistThread;
075    private long persistInterval = MAX_PERSIST_INTERVAL;
076    public static final long MAX_PERSIST_INTERVAL = 600000;
077    private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = "SelectorCachePersistThread";
078
079    /**
080     * Constructor
081     */
082    public SubQueueSelectorCacheBroker(Broker next, final File persistFile) {
083        super(next);
084        this.persistFile = persistFile;
085        LOG.info("Using persisted selector cache from[{}]", persistFile);
086
087        readCache();
088
089        persistThread = new Thread(this, SELECTOR_CACHE_PERSIST_THREAD_NAME);
090        persistThread.start();
091        enableJmx();
092    }
093
094    private void enableJmx() {
095        BrokerService broker = getBrokerService();
096        if (broker.isUseJmx()) {
097            VirtualDestinationSelectorCacheView view = new VirtualDestinationSelectorCacheView(this);
098            try {
099                objectName = BrokerMBeanSupport.createVirtualDestinationSelectorCacheName(broker.getBrokerObjectName(), "plugin", "virtualDestinationCache");
100                LOG.trace("virtualDestinationCacheSelector mbean name; " + objectName.toString());
101                AnnotatedMBean.registerMBean(broker.getManagementContext(), view, objectName);
102            } catch (Exception e) {
103                LOG.warn("JMX is enabled, but when installing the VirtualDestinationSelectorCache, couldn't install the JMX mbeans. Continuing without installing the mbeans.");
104            }
105
106        }
107    }
108
109    @Override
110    public void stop() throws Exception {
111        running = false;
112        if (persistThread != null) {
113            persistThread.interrupt();
114            persistThread.join();
115        } //if
116        unregisterMBeans();
117    }
118
119    private void unregisterMBeans() {
120        BrokerService broker = getBrokerService();
121        if (broker.isUseJmx() && this.objectName != null) {
122            try {
123                broker.getManagementContext().unregisterMBean(objectName);
124            } catch (JMException e) {
125                LOG.warn("Trying uninstall VirtualDestinationSelectorCache; couldn't uninstall mbeans, continuting...");
126            }
127        }
128    }
129
130    @Override
131    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
132        // don't track selectors for advisory topics
133        if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
134            String destinationName = info.getDestination().getQualifiedName();
135            LOG.debug("Caching consumer selector [{}] on  '{}'", info.getSelector(), destinationName);
136
137            String selector = info.getSelector() == null ? MATCH_EVERYTHING : info.getSelector();
138
139            if (!(ignoreWildcardSelectors && hasWildcards(selector))) {
140
141                Set<String> selectors = subSelectorCache.get(destinationName);
142                if (selectors == null) {
143                    selectors = Collections.synchronizedSet(new HashSet<String>());
144                } else if (singleSelectorPerDestination && !MATCH_EVERYTHING.equals(selector)) {
145                    // in this case, we allow only ONE selector. But we don't count the catch-all "null/TRUE" selector
146                    // here, we always allow that one. But only one true selector.
147                    boolean containsMatchEverything = selectors.contains(MATCH_EVERYTHING);
148                    selectors.clear();
149
150                    // put back the MATCH_EVERYTHING selector
151                    if (containsMatchEverything) {
152                        selectors.add(MATCH_EVERYTHING);
153                    }
154                }
155
156                LOG.debug("adding new selector: into cache " + selector);
157                selectors.add(selector);
158                LOG.debug("current selectors in cache: " + selectors);
159                subSelectorCache.put(destinationName, selectors);
160            }
161
162
163        }
164        return super.addConsumer(context, info);
165    }
166
167    // trivial check for SQL92/selector wildcards
168    private boolean hasWildcards(String selector) {
169        return selector.contains("%") || selector.contains("_");
170    }
171
172    @Override
173    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
174        if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
175
176            if (singleSelectorPerDestination) {
177                String destinationName = info.getDestination().getQualifiedName();
178                Set<String> selectors = subSelectorCache.get(destinationName);
179                if (info.getSelector() == null && selectors.size() > 1) {
180                    boolean removed = selectors.remove(MATCH_EVERYTHING);
181                    LOG.debug("A non-selector consumer has dropped. Removing the catchall matching pattern 'TRUE'. Successful? " + removed);
182                }
183            }
184
185        }
186        super.removeConsumer(context, info);
187    }
188
189    private void readCache() {
190        if (persistFile != null && persistFile.exists()) {
191            try {
192                FileInputStream fis = new FileInputStream(persistFile);
193                try {
194                    ObjectInputStream in = new ObjectInputStream(fis);
195                    try {
196                        subSelectorCache = (ConcurrentHashMap<String, Set<String>>) in.readObject();
197                    } catch (ClassNotFoundException ex) {
198                        LOG.error("Invalid selector cache data found. Please remove file.", ex);
199                    } finally {
200                        in.close();
201                    } //try
202                } finally {
203                    fis.close();
204                } //try
205            } catch (IOException ex) {
206                LOG.error("Unable to read persisted selector cache...it will be ignored!", ex);
207            } //try
208        } //if
209    }
210
211    /**
212     * Persist the selector cache.
213     */
214    private void persistCache() {
215        LOG.debug("Persisting selector cache....");
216        try {
217            FileOutputStream fos = new FileOutputStream(persistFile);
218            try {
219                ObjectOutputStream out = new ObjectOutputStream(fos);
220                try {
221                    out.writeObject(subSelectorCache);
222                } finally {
223                    out.flush();
224                    out.close();
225                } //try
226            } catch (IOException ex) {
227                LOG.error("Unable to persist selector cache", ex);
228            } finally {
229                fos.close();
230            } //try
231        } catch (IOException ex) {
232            LOG.error("Unable to access file[{}]", persistFile, ex);
233        } //try
234    }
235
236    /**
237     * @return The JMS selector for the specified {@code destination}
238     */
239    public Set<String> getSelector(final String destination) {
240        return subSelectorCache.get(destination);
241    }
242
243    /**
244     * Persist the selector cache every {@code MAX_PERSIST_INTERVAL}ms.
245     *
246     * @see java.lang.Runnable#run()
247     */
248    @Override
249    public void run() {
250        while (running) {
251            try {
252                Thread.sleep(persistInterval);
253            } catch (InterruptedException ex) {
254            } //try
255
256            persistCache();
257        }
258    }
259
260    public boolean isSingleSelectorPerDestination() {
261        return singleSelectorPerDestination;
262    }
263
264    public void setSingleSelectorPerDestination(boolean singleSelectorPerDestination) {
265        this.singleSelectorPerDestination = singleSelectorPerDestination;
266    }
267
268    public Set<String> getSelectorsForDestination(String destinationName) {
269        if (subSelectorCache.containsKey(destinationName)) {
270            return new HashSet<String>(subSelectorCache.get(destinationName));
271        }
272
273        return Collections.EMPTY_SET;
274    }
275
276    public long getPersistInterval() {
277        return persistInterval;
278    }
279
280    public void setPersistInterval(long persistInterval) {
281        this.persistInterval = persistInterval;
282    }
283
284    public boolean deleteSelectorForDestination(String destinationName, String selector) {
285        if (subSelectorCache.containsKey(destinationName)) {
286            Set<String> cachedSelectors = subSelectorCache.get(destinationName);
287            return cachedSelectors.remove(selector);
288        }
289
290        return false;
291    }
292
293    public boolean deleteAllSelectorsForDestination(String destinationName) {
294        if (subSelectorCache.containsKey(destinationName)) {
295            Set<String> cachedSelectors = subSelectorCache.get(destinationName);
296            cachedSelectors.clear();
297        }
298        return true;
299    }
300
301    public boolean isIgnoreWildcardSelectors() {
302        return ignoreWildcardSelectors;
303    }
304
305    public void setIgnoreWildcardSelectors(boolean ignoreWildcardSelectors) {
306        this.ignoreWildcardSelectors = ignoreWildcardSelectors;
307    }
308}
309