001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.util.Arrays;
020import java.util.List;
021
022import org.apache.activemq.broker.region.Destination;
023import org.apache.activemq.broker.region.Subscription;
024import org.apache.activemq.command.BrokerId;
025import org.apache.activemq.command.ConsumerInfo;
026import org.apache.activemq.command.Message;
027import org.apache.activemq.command.NetworkBridgeFilter;
028import org.apache.activemq.filter.MessageEvaluationContext;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * implement conditional behavior for queue consumers, allows replaying back to
034 * origin if no consumers are present on the local broker after a configurable
035 * delay, irrespective of the TTL. Also allows rate limiting of messages
036 * through the network, useful for static includes
037 *
038 * @org.apache.xbean.XBean
039 */
040public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory {
041
042    boolean replayWhenNoConsumers = false;
043    int replayDelay = 0;
044    int rateLimit = 0;
045    int rateDuration = 1000;
046    private boolean selectorAware = false;
047
048    @Override
049    public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int messageTTL, int consumerTTL) {
050        ConditionalNetworkBridgeFilter filter = new ConditionalNetworkBridgeFilter();
051        filter.setNetworkBrokerId(remoteBrokerPath[0]);
052        filter.setMessageTTL(messageTTL);
053        filter.setConsumerTTL(consumerTTL);
054        filter.setAllowReplayWhenNoConsumers(isReplayWhenNoConsumers());
055        filter.setRateLimit(getRateLimit());
056        filter.setRateDuration(getRateDuration());
057        filter.setReplayDelay(getReplayDelay());
058        filter.setSelectorAware(isSelectorAware());
059        return filter;
060    }
061
062    public void setReplayWhenNoConsumers(boolean replayWhenNoConsumers) {
063        this.replayWhenNoConsumers = replayWhenNoConsumers;
064    }
065
066    public boolean isReplayWhenNoConsumers() {
067        return replayWhenNoConsumers;
068    }
069
070    public void setRateLimit(int rateLimit) {
071        this.rateLimit = rateLimit;
072    }
073
074    public int getRateLimit() {
075        return rateLimit;
076    }
077
078    public int getRateDuration() {
079        return rateDuration;
080    }
081
082    public void setRateDuration(int rateDuration) {
083        this.rateDuration = rateDuration;
084    }
085
086    public int getReplayDelay() {
087        return replayDelay;
088    }
089
090    public void setReplayDelay(int replayDelay) {
091        this.replayDelay = replayDelay;
092    }
093
094    public void setSelectorAware(boolean selectorAware) {
095        this.selectorAware = selectorAware;
096    }
097
098    public boolean isSelectorAware() {
099        return selectorAware;
100    }
101
102    private static class ConditionalNetworkBridgeFilter extends NetworkBridgeFilter {
103        final static Logger LOG = LoggerFactory.getLogger(ConditionalNetworkBridgeFilter.class);
104        private int rateLimit;
105        private int rateDuration = 1000;
106        private boolean allowReplayWhenNoConsumers = true;
107        private int replayDelay = 1000;
108
109        private int matchCount;
110        private long rateDurationEnd;
111        private boolean selectorAware = false;
112
113        @Override
114        protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) {
115            boolean match = true;
116            if (mec.getDestination().isQueue() && contains(message.getBrokerPath(), networkBrokerId)) {
117                // potential replay back to origin
118                match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
119
120                if (match) {
121                    LOG.trace("Replaying [{}] for [{}] back to origin in the absence of a local consumer", message.getMessageId(), message.getDestination());
122                } else {
123                    LOG.trace("Suppressing replay of [{}] for [{}] back to origin {}", new Object[]{ message.getMessageId(), message.getDestination(), Arrays.asList(message.getBrokerPath())} );
124                }
125
126            } else {
127                // use existing filter logic for topics and non replays
128                match = super.matchesForwardingFilter(message, mec);
129            }
130
131            if (match && rateLimitExceeded()) {
132                LOG.trace("Throttled network consumer rejecting [{}] for [{}] {}>{}/{}", new Object[]{
133                        message.getMessageId(), message.getDestination(), matchCount, rateLimit, rateDuration
134                });
135                match = false;
136            }
137
138            return match;
139        }
140
141        private boolean hasNotJustArrived(Message message) {
142            return replayDelay == 0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis());
143        }
144
145        private boolean hasNoLocalConsumers(final Message message, final MessageEvaluationContext mec) {
146            Destination regionDestination = (Destination) mec.getMessageReference().getRegionDestination();
147            List<Subscription> consumers = regionDestination.getConsumers();
148            for (Subscription sub : consumers) {
149                if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) {
150
151                    if (!isSelectorAware()) {
152                        LOG.trace("Not replaying [{}] for [{}] to origin due to existing local consumer: {}", new Object[]{
153                                message.getMessageId(), message.getDestination(), sub.getConsumerInfo()
154                        });
155                        return false;
156
157                    } else {
158                        try {
159                            if (sub.matches(message, mec)) {
160                                LOG.trace("Not replaying [{}] for [{}] to origin due to existing selector matching local consumer: {}", new Object[]{
161                                        message.getMessageId(), message.getDestination(), sub.getConsumerInfo()
162                                });
163                                return false;
164                            }
165                        } catch (Exception ignored) {}
166                    }
167                }
168            }
169            return true;
170        }
171
172        private boolean rateLimitExceeded() {
173            if (rateLimit == 0) {
174                return false;
175            }
176
177            if (rateDurationEnd < System.currentTimeMillis()) {
178                rateDurationEnd = System.currentTimeMillis() + rateDuration;
179                matchCount = 0;
180            }
181            return ++matchCount > rateLimit;
182        }
183
184        public void setReplayDelay(int replayDelay) {
185            this.replayDelay = replayDelay;
186        }
187
188        public void setRateLimit(int rateLimit) {
189            this.rateLimit = rateLimit;
190        }
191
192        public void setRateDuration(int rateDuration) {
193            this.rateDuration = rateDuration;
194        }
195
196        public void setAllowReplayWhenNoConsumers(boolean allowReplayWhenNoConsumers) {
197            this.allowReplayWhenNoConsumers = allowReplayWhenNoConsumers;
198        }
199
200        public void setSelectorAware(boolean selectorAware) {
201            this.selectorAware = selectorAware;
202        }
203
204        public boolean isSelectorAware() {
205            return selectorAware;
206        }
207    }
208}