001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import org.apache.activemq.advisory.AdvisorySupport; 020import org.apache.activemq.filter.BooleanExpression; 021import org.apache.activemq.filter.MessageEvaluationContext; 022import org.apache.activemq.util.JMSExceptionSupport; 023import org.slf4j.Logger; 024import org.slf4j.LoggerFactory; 025 026import javax.jms.JMSException; 027import java.io.IOException; 028import java.util.Arrays; 029 030/** 031 * @openwire:marshaller code="91" 032 * 033 */ 034public class NetworkBridgeFilter implements DataStructure, BooleanExpression { 035 036 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER; 037 static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class); 038 039 protected BrokerId networkBrokerId; 040 protected int messageTTL; 041 protected int consumerTTL; 042 transient ConsumerInfo consumerInfo; 043 044 public NetworkBridgeFilter() { 045 } 046 047 public NetworkBridgeFilter(ConsumerInfo consumerInfo, BrokerId networkBrokerId, int messageTTL, int consumerTTL) { 048 this.networkBrokerId = networkBrokerId; 049 this.messageTTL = messageTTL; 050 this.consumerTTL = consumerTTL; 051 this.consumerInfo = consumerInfo; 052 } 053 054 public byte getDataStructureType() { 055 return DATA_STRUCTURE_TYPE; 056 } 057 058 public boolean isMarshallAware() { 059 return false; 060 } 061 062 public boolean matches(MessageEvaluationContext mec) throws JMSException { 063 try { 064 // for Queues - the message can be acknowledged and dropped whilst 065 // still 066 // in the dispatch loop 067 // so need to get the reference to it 068 Message message = mec.getMessage(); 069 return message != null && matchesForwardingFilter(message, mec); 070 } catch (IOException e) { 071 throw JMSExceptionSupport.create(e); 072 } 073 } 074 075 public Object evaluate(MessageEvaluationContext message) throws JMSException { 076 return matches(message) ? Boolean.TRUE : Boolean.FALSE; 077 } 078 079 protected boolean matchesForwardingFilter(Message message, MessageEvaluationContext mec) { 080 081 if (contains(message.getBrokerPath(), networkBrokerId)) { 082 if (LOG.isTraceEnabled()) { 083 LOG.trace("Message all ready routed once through target broker (" 084 + networkBrokerId + "), path: " 085 + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message); 086 } 087 return false; 088 } 089 090 int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length; 091 092 if (messageTTL > -1 && hops >= messageTTL) { 093 if (LOG.isTraceEnabled()) { 094 LOG.trace("Message restricted to " + messageTTL + " network hops ignoring: " + message); 095 } 096 return false; 097 } 098 099 if (message.isAdvisory()) { 100 if (consumerInfo != null && consumerInfo.isNetworkSubscription() && isAdvisoryInterpretedByNetworkBridge(message)) { 101 // they will be interpreted by the bridge leading to dup commands 102 if (LOG.isTraceEnabled()) { 103 LOG.trace("not propagating advisory to network sub: " + consumerInfo.getConsumerId() + ", message: "+ message); 104 } 105 return false; 106 } else if ( message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) { 107 ConsumerInfo info = (ConsumerInfo)message.getDataStructure(); 108 hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length; 109 if (consumerTTL > -1 && hops >= consumerTTL) { 110 if (LOG.isTraceEnabled()) { 111 LOG.trace("ConsumerInfo advisory restricted to " + consumerTTL + " network hops ignoring: " + message); 112 } 113 return false; 114 } 115 116 if (contains(info.getBrokerPath(), networkBrokerId)) { 117 LOG.trace("ConsumerInfo advisory all ready routed once through target broker (" 118 + networkBrokerId + "), path: " 119 + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message); 120 return false; 121 } 122 } 123 } 124 return true; 125 } 126 127 public static boolean isAdvisoryInterpretedByNetworkBridge(Message message) { 128 return AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) || AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination()); 129 } 130 131 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { 132 if (brokerPath != null && brokerId != null) { 133 for (int i = 0; i < brokerPath.length; i++) { 134 if (brokerId.equals(brokerPath[i])) { 135 return true; 136 } 137 } 138 } 139 return false; 140 } 141 142 // keep for backward compat with older 143 // wire formats 144 public int getNetworkTTL() { 145 return messageTTL; 146 } 147 148 public void setNetworkTTL(int networkTTL) { 149 messageTTL = networkTTL; 150 consumerTTL = networkTTL; 151 } 152 153 /** 154 * @openwire:property version=1 cache=true 155 */ 156 public BrokerId getNetworkBrokerId() { 157 return networkBrokerId; 158 } 159 160 public void setNetworkBrokerId(BrokerId remoteBrokerPath) { 161 this.networkBrokerId = remoteBrokerPath; 162 } 163 164 public void setMessageTTL(int messageTTL) { 165 this.messageTTL = messageTTL; 166 } 167 168 /** 169 * @openwire:property version=10 170 */ 171 public int getMessageTTL() { 172 return this.messageTTL; 173 } 174 175 public void setConsumerTTL(int consumerTTL) { 176 this.consumerTTL = consumerTTL; 177 } 178 179 /** 180 * @openwire:property version=10 181 */ 182 public int getConsumerTTL() { 183 return this.consumerTTL; 184 } 185}