001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import org.apache.activemq.advisory.AdvisorySupport;
020import org.apache.activemq.filter.BooleanExpression;
021import org.apache.activemq.filter.MessageEvaluationContext;
022import org.apache.activemq.util.JMSExceptionSupport;
023import org.slf4j.Logger;
024import org.slf4j.LoggerFactory;
025
026import javax.jms.JMSException;
027import java.io.IOException;
028import java.util.Arrays;
029
030/**
031 * @openwire:marshaller code="91"
032 *
033 */
034public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
035
036    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER;
037    static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class);
038
039    protected BrokerId networkBrokerId;
040    protected int messageTTL;
041    protected int consumerTTL;
042    transient ConsumerInfo consumerInfo;
043
044    public NetworkBridgeFilter() {
045    }
046
047    public NetworkBridgeFilter(ConsumerInfo consumerInfo, BrokerId networkBrokerId, int messageTTL, int consumerTTL) {
048        this.networkBrokerId = networkBrokerId;
049        this.messageTTL = messageTTL;
050        this.consumerTTL = consumerTTL;
051        this.consumerInfo = consumerInfo;
052    }
053
054    public byte getDataStructureType() {
055        return DATA_STRUCTURE_TYPE;
056    }
057
058    public boolean isMarshallAware() {
059        return false;
060    }
061
062    public boolean matches(MessageEvaluationContext mec) throws JMSException {
063        try {
064            // for Queues - the message can be acknowledged and dropped whilst
065            // still
066            // in the dispatch loop
067            // so need to get the reference to it
068            Message message = mec.getMessage();
069            return message != null && matchesForwardingFilter(message, mec);
070        } catch (IOException e) {
071            throw JMSExceptionSupport.create(e);
072        }
073    }
074
075    public Object evaluate(MessageEvaluationContext message) throws JMSException {
076        return matches(message) ? Boolean.TRUE : Boolean.FALSE;
077    }
078
079    protected boolean matchesForwardingFilter(Message message, MessageEvaluationContext mec) {
080
081        if (contains(message.getBrokerPath(), networkBrokerId)) {
082            if (LOG.isTraceEnabled()) {
083                LOG.trace("Message all ready routed once through target broker ("
084                        + networkBrokerId + "), path: "
085                        + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
086            }
087            return false;
088        }
089
090        int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
091
092        if (messageTTL > -1 && hops >= messageTTL) {
093            if (LOG.isTraceEnabled()) {
094                LOG.trace("Message restricted to " + messageTTL + " network hops ignoring: " + message);
095            }
096            return false;
097        }
098
099        if (message.isAdvisory()) {
100            if (consumerInfo != null && consumerInfo.isNetworkSubscription() && isAdvisoryInterpretedByNetworkBridge(message)) {
101                // they will be interpreted by the bridge leading to dup commands
102                if (LOG.isTraceEnabled()) {
103                    LOG.trace("not propagating advisory to network sub: " + consumerInfo.getConsumerId() + ", message: "+ message);
104                }
105                return false;
106            } else if ( message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
107                ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
108                hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
109                if (consumerTTL > -1 && hops >= consumerTTL) {
110                    if (LOG.isTraceEnabled()) {
111                        LOG.trace("ConsumerInfo advisory restricted to " + consumerTTL + " network hops ignoring: " + message);
112                    }
113                    return false;
114                }
115
116                if (contains(info.getBrokerPath(), networkBrokerId)) {
117                    LOG.trace("ConsumerInfo advisory all ready routed once through target broker ("
118                            + networkBrokerId + "), path: "
119                            + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message);
120                    return false;
121                }
122            }
123        }
124        return true;
125    }
126
127    public static boolean isAdvisoryInterpretedByNetworkBridge(Message message) {
128        return AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) || AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination());
129    }
130
131    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
132        if (brokerPath != null && brokerId != null) {
133            for (int i = 0; i < brokerPath.length; i++) {
134                if (brokerId.equals(brokerPath[i])) {
135                    return true;
136                }
137            }
138        }
139        return false;
140    }
141
142    // keep for backward compat with older
143    // wire formats
144    public int getNetworkTTL() {
145        return messageTTL;
146    }
147
148    public void setNetworkTTL(int networkTTL) {
149        messageTTL = networkTTL;
150        consumerTTL = networkTTL;
151    }
152
153    /**
154     * @openwire:property version=1 cache=true
155     */
156    public BrokerId getNetworkBrokerId() {
157        return networkBrokerId;
158    }
159
160    public void setNetworkBrokerId(BrokerId remoteBrokerPath) {
161        this.networkBrokerId = remoteBrokerPath;
162    }
163
164    public void setMessageTTL(int messageTTL) {
165        this.messageTTL = messageTTL;
166    }
167
168    /**
169     * @openwire:property version=10
170     */
171    public int getMessageTTL() {
172        return this.messageTTL;
173    }
174
175    public void setConsumerTTL(int consumerTTL) {
176        this.consumerTTL = consumerTTL;
177    }
178
179    /**
180     * @openwire:property version=10
181     */
182    public int getConsumerTTL() {
183        return this.consumerTTL;
184    }
185}