001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.List;
022import java.util.concurrent.CopyOnWriteArrayList;
023import java.util.concurrent.atomic.AtomicInteger;
024
025import javax.jms.InvalidSelectorException;
026import javax.jms.JMSException;
027import javax.management.ObjectName;
028
029import org.apache.activemq.broker.Broker;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.command.ActiveMQDestination;
032import org.apache.activemq.command.ConsumerId;
033import org.apache.activemq.command.ConsumerInfo;
034import org.apache.activemq.command.MessageAck;
035import org.apache.activemq.filter.BooleanExpression;
036import org.apache.activemq.filter.DestinationFilter;
037import org.apache.activemq.filter.LogicExpression;
038import org.apache.activemq.filter.MessageEvaluationContext;
039import org.apache.activemq.filter.NoLocalExpression;
040import org.apache.activemq.selector.SelectorParser;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044public abstract class AbstractSubscription implements Subscription {
045
046    private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscription.class);
047    protected Broker broker;
048    protected ConnectionContext context;
049    protected ConsumerInfo info;
050    protected final DestinationFilter destinationFilter;
051    protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
052    protected final AtomicInteger prefetchExtension = new AtomicInteger(0);
053
054    private boolean usePrefetchExtension = true;
055    private BooleanExpression selectorExpression;
056    private ObjectName objectName;
057    private int cursorMemoryHighWaterMark = 70;
058    private boolean slowConsumer;
059    private long lastAckTime;
060    private final SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
061
062    public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
063        this.broker = broker;
064        this.context = context;
065        this.info = info;
066        this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
067        this.selectorExpression = parseSelector(info);
068        this.lastAckTime = System.currentTimeMillis();
069    }
070
071    private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
072        BooleanExpression rc = null;
073        if (info.getSelector() != null) {
074            rc = SelectorParser.parse(info.getSelector());
075        }
076        if (info.isNoLocal()) {
077            if (rc == null) {
078                rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
079            } else {
080                rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
081            }
082        }
083        if (info.getAdditionalPredicate() != null) {
084            if (rc == null) {
085                rc = info.getAdditionalPredicate();
086            } else {
087                rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
088            }
089        }
090        return rc;
091    }
092
093    @Override
094    public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
095        this.lastAckTime = System.currentTimeMillis();
096        subscriptionStatistics.getConsumedCount().increment();
097    }
098
099    @Override
100    public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
101        ConsumerId targetConsumerId = node.getTargetConsumerId();
102        if (targetConsumerId != null) {
103            if (!targetConsumerId.equals(info.getConsumerId())) {
104                return false;
105            }
106        }
107        try {
108            return (selectorExpression == null || selectorExpression.matches(context)) && this.context.isAllowedToConsume(node);
109        } catch (JMSException e) {
110            LOG.info("Selector failed to evaluate: {}", e.getMessage(), e);
111            return false;
112        }
113    }
114
115    @Override
116    public boolean isWildcard() {
117        return destinationFilter.isWildcard();
118    }
119
120    @Override
121    public boolean matches(ActiveMQDestination destination) {
122        return destinationFilter.matches(destination);
123    }
124
125    @Override
126    public void add(ConnectionContext context, Destination destination) throws Exception {
127        destinations.add(destination);
128    }
129
130    @Override
131    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
132        destinations.remove(destination);
133        return Collections.EMPTY_LIST;
134    }
135
136    @Override
137    public ConsumerInfo getConsumerInfo() {
138        return info;
139    }
140
141    @Override
142    public void gc() {
143    }
144
145    @Override
146    public ConnectionContext getContext() {
147        return context;
148    }
149
150    public ConsumerInfo getInfo() {
151        return info;
152    }
153
154    public BooleanExpression getSelectorExpression() {
155        return selectorExpression;
156    }
157
158    @Override
159    public String getSelector() {
160        return info.getSelector();
161    }
162
163    @Override
164    public void setSelector(String selector) throws InvalidSelectorException {
165        ConsumerInfo copy = info.copy();
166        copy.setSelector(selector);
167        BooleanExpression newSelector = parseSelector(copy);
168        // its valid so lets actually update it now
169        info.setSelector(selector);
170        this.selectorExpression = newSelector;
171    }
172
173    @Override
174    public ObjectName getObjectName() {
175        return objectName;
176    }
177
178    @Override
179    public void setObjectName(ObjectName objectName) {
180        this.objectName = objectName;
181    }
182
183    @Override
184    public int getPrefetchSize() {
185        return info.getPrefetchSize();
186    }
187
188    public boolean isUsePrefetchExtension() {
189        return usePrefetchExtension;
190    }
191
192    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
193        this.usePrefetchExtension = usePrefetchExtension;
194    }
195
196    public void setPrefetchSize(int newSize) {
197        info.setPrefetchSize(newSize);
198    }
199
200    @Override
201    public boolean isRecoveryRequired() {
202        return true;
203    }
204
205    @Override
206    public boolean isSlowConsumer() {
207        return slowConsumer;
208    }
209
210    public void setSlowConsumer(boolean val) {
211        slowConsumer = val;
212    }
213
214    @Override
215    public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
216        boolean result = false;
217        MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
218        try {
219            Destination regionDestination = (Destination) message.getRegionDestination();
220            msgContext.setDestination(regionDestination.getActiveMQDestination());
221            msgContext.setMessageReference(message);
222            result = matches(message, msgContext);
223            if (result) {
224                doAddRecoveredMessage(message);
225            }
226
227        } finally {
228            msgContext.clear();
229        }
230        return result;
231    }
232
233    @Override
234    public ActiveMQDestination getActiveMQDestination() {
235        return info != null ? info.getDestination() : null;
236    }
237
238    @Override
239    public boolean isBrowser() {
240        return info != null && info.isBrowser();
241    }
242
243    @Override
244    public long getInFlightMessageSize() {
245        return subscriptionStatistics.getInflightMessageSize().getTotalSize();
246    }
247
248    @Override
249    public int getInFlightUsage() {
250        if (info.getPrefetchSize() > 0) {
251            return (getInFlightSize() * 100)/info.getPrefetchSize();
252        }
253        return Integer.MAX_VALUE;
254    }
255
256    /**
257     * Add a destination
258     * @param destination
259     */
260    public void addDestination(Destination destination) {
261
262    }
263
264    /**
265     * Remove a destination
266     * @param destination
267     */
268    public void removeDestination(Destination destination) {
269
270    }
271
272    @Override
273    public int getCursorMemoryHighWaterMark(){
274        return this.cursorMemoryHighWaterMark;
275    }
276
277    @Override
278    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
279        this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
280    }
281
282    @Override
283    public int countBeforeFull() {
284        return getDispatchedQueueSize() - info.getPrefetchSize();
285    }
286
287    @Override
288    public void unmatched(MessageReference node) throws IOException {
289        // only durable topic subs have something to do here
290    }
291
292    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
293        add(message);
294    }
295
296    @Override
297    public long getTimeOfLastMessageAck() {
298        return lastAckTime;
299    }
300
301    public void setTimeOfLastMessageAck(long value) {
302        this.lastAckTime = value;
303    }
304
305    public long getConsumedCount(){
306        return subscriptionStatistics.getConsumedCount().getCount();
307    }
308
309    public void incrementConsumedCount(){
310        subscriptionStatistics.getConsumedCount().increment();
311    }
312
313    public void resetConsumedCount(){
314        subscriptionStatistics.getConsumedCount().reset();
315    }
316
317    @Override
318    public SubscriptionStatistics getSubscriptionStatistics() {
319        return subscriptionStatistics;
320    }
321
322    public void wakeupDestinationsForDispatch() {
323        for (Destination dest : destinations) {
324            dest.wakeup();
325        }
326    }
327
328    public AtomicInteger getPrefetchExtension() {
329        return this.prefetchExtension;
330    }
331
332    protected void contractPrefetchExtension(int amount) {
333        if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
334            decrementPrefetchExtension(amount);
335        }
336    }
337
338    protected void expandPrefetchExtension(int amount) {
339        if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
340            incrementPrefetchExtension(amount);
341        }
342    }
343
344    protected void decrementPrefetchExtension(int amount) {
345        while (true) {
346            int currentExtension = prefetchExtension.get();
347            int newExtension = Math.max(0, currentExtension - amount);
348            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
349                break;
350            }
351        }
352    }
353
354    private void incrementPrefetchExtension(int amount) {
355        while (true) {
356            int currentExtension = prefetchExtension.get();
357            int newExtension = Math.max(currentExtension, currentExtension + amount);
358            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
359                break;
360            }
361        }
362    }
363
364    public CopyOnWriteArrayList<Destination> getDestinations() {
365        return destinations;
366    }
367}