001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.IOException; 020import java.util.concurrent.atomic.AtomicBoolean; 021import java.util.concurrent.atomic.AtomicLong; 022 023import org.apache.activemq.broker.region.Destination; 024import org.apache.activemq.broker.region.Region; 025import org.apache.activemq.command.Message; 026import org.apache.activemq.command.MessageId; 027import org.apache.activemq.state.ProducerState; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * Holds internal state in the broker for a MessageProducer 033 */ 034public class ProducerBrokerExchange { 035 036 private static final Logger LOG = LoggerFactory.getLogger(ProducerBrokerExchange.class); 037 private ConnectionContext connectionContext; 038 private Destination regionDestination; 039 private Region region; 040 private ProducerState producerState; 041 private boolean mutable = true; 042 private AtomicLong lastSendSequenceNumber = new AtomicLong(-1); 043 private boolean auditProducerSequenceIds; 044 private boolean isNetworkProducer; 045 private BrokerService brokerService; 046 private FlowControlInfo flowControlInfo = new FlowControlInfo(); 047 048 public ProducerBrokerExchange() { 049 } 050 051 public ProducerBrokerExchange copy() { 052 ProducerBrokerExchange rc = new ProducerBrokerExchange(); 053 rc.connectionContext = connectionContext.copy(); 054 rc.regionDestination = regionDestination; 055 rc.region = region; 056 rc.producerState = producerState; 057 rc.mutable = mutable; 058 rc.flowControlInfo = flowControlInfo; 059 return rc; 060 } 061 062 063 /** 064 * @return the connectionContext 065 */ 066 public ConnectionContext getConnectionContext() { 067 return this.connectionContext; 068 } 069 070 /** 071 * @param connectionContext the connectionContext to set 072 */ 073 public void setConnectionContext(ConnectionContext connectionContext) { 074 this.connectionContext = connectionContext; 075 } 076 077 /** 078 * @return the mutable 079 */ 080 public boolean isMutable() { 081 return this.mutable; 082 } 083 084 /** 085 * @param mutable the mutable to set 086 */ 087 public void setMutable(boolean mutable) { 088 this.mutable = mutable; 089 } 090 091 /** 092 * @return the regionDestination 093 */ 094 public Destination getRegionDestination() { 095 return this.regionDestination; 096 } 097 098 /** 099 * @param regionDestination the regionDestination to set 100 */ 101 public void setRegionDestination(Destination regionDestination) { 102 this.regionDestination = regionDestination; 103 } 104 105 /** 106 * @return the region 107 */ 108 public Region getRegion() { 109 return this.region; 110 } 111 112 /** 113 * @param region the region to set 114 */ 115 public void setRegion(Region region) { 116 this.region = region; 117 } 118 119 /** 120 * @return the producerState 121 */ 122 public ProducerState getProducerState() { 123 return this.producerState; 124 } 125 126 /** 127 * @param producerState the producerState to set 128 */ 129 public void setProducerState(ProducerState producerState) { 130 this.producerState = producerState; 131 } 132 133 /** 134 * Enforce duplicate suppression using info from persistence adapter 135 * 136 * @return false if message should be ignored as a duplicate 137 */ 138 public boolean canDispatch(Message messageSend) { 139 boolean canDispatch = true; 140 if (auditProducerSequenceIds && messageSend.isPersistent()) { 141 final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId(); 142 if (isNetworkProducer) { 143 // messages are multiplexed on this producer so we need to query the persistenceAdapter 144 long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId()); 145 if (producerSequenceId <= lastStoredForMessageProducer) { 146 canDispatch = false; 147 LOG.warn("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{ 148 (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastStoredForMessageProducer 149 }); 150 } 151 } else if (producerSequenceId <= lastSendSequenceNumber.get()) { 152 canDispatch = false; 153 if (messageSend.isInTransaction()) { 154 LOG.warn("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}", new Object[]{ 155 (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber 156 }); 157 } else { 158 LOG.debug("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}", new Object[]{ 159 (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber 160 }); 161 162 } 163 } else { 164 // track current so we can suppress duplicates later in the stream 165 lastSendSequenceNumber.set(producerSequenceId); 166 } 167 } 168 return canDispatch; 169 } 170 171 private long getStoredSequenceIdForMessage(MessageId messageId) { 172 try { 173 return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId()); 174 } catch (IOException ignored) { 175 LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored); 176 } 177 return -1; 178 } 179 180 public void setLastStoredSequenceId(long l) { 181 auditProducerSequenceIds = true; 182 if (connectionContext.isNetworkConnection()) { 183 brokerService = connectionContext.getBroker().getBrokerService(); 184 isNetworkProducer = true; 185 } 186 lastSendSequenceNumber.set(l); 187 LOG.debug("last stored sequence id set: {}", l); 188 } 189 190 public void incrementSend() { 191 flowControlInfo.incrementSend(); 192 } 193 194 public void blockingOnFlowControl(boolean blockingOnFlowControl) { 195 flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl); 196 } 197 198 public void incrementTimeBlocked(Destination destination, long timeBlocked) { 199 flowControlInfo.incrementTimeBlocked(timeBlocked); 200 } 201 202 203 public boolean isBlockedForFlowControl() { 204 return flowControlInfo.isBlockingOnFlowControl(); 205 } 206 207 public void resetFlowControl() { 208 flowControlInfo.reset(); 209 } 210 211 public long getTotalTimeBlocked() { 212 return flowControlInfo.getTotalTimeBlocked(); 213 } 214 215 public int getPercentageBlocked() { 216 double value = flowControlInfo.getSendsBlocked() / flowControlInfo.getTotalSends(); 217 return (int) value * 100; 218 } 219 220 221 public static class FlowControlInfo { 222 private AtomicBoolean blockingOnFlowControl = new AtomicBoolean(); 223 private AtomicLong totalSends = new AtomicLong(); 224 private AtomicLong sendsBlocked = new AtomicLong(); 225 private AtomicLong totalTimeBlocked = new AtomicLong(); 226 227 228 public boolean isBlockingOnFlowControl() { 229 return blockingOnFlowControl.get(); 230 } 231 232 public void setBlockingOnFlowControl(boolean blockingOnFlowControl) { 233 this.blockingOnFlowControl.set(blockingOnFlowControl); 234 if (blockingOnFlowControl) { 235 incrementSendBlocked(); 236 } 237 } 238 239 240 public long getTotalSends() { 241 return totalSends.get(); 242 } 243 244 public void incrementSend() { 245 this.totalSends.incrementAndGet(); 246 } 247 248 public long getSendsBlocked() { 249 return sendsBlocked.get(); 250 } 251 252 public void incrementSendBlocked() { 253 this.sendsBlocked.incrementAndGet(); 254 } 255 256 public long getTotalTimeBlocked() { 257 return totalTimeBlocked.get(); 258 } 259 260 public void incrementTimeBlocked(long time) { 261 this.totalTimeBlocked.addAndGet(time); 262 } 263 264 public void reset() { 265 blockingOnFlowControl.set(false); 266 totalSends.set(0); 267 sendsBlocked.set(0); 268 totalTimeBlocked.set(0); 269 270 } 271 } 272}