001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.util.concurrent.atomic.AtomicLong; 020 021import org.apache.activemq.state.CommandVisitor; 022 023/** 024 * 025 * @openwire:marshaller code="6" 026 * 027 */ 028public class ProducerInfo extends BaseCommand { 029 030 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_INFO; 031 032 protected ProducerId producerId; 033 protected ActiveMQDestination destination; 034 protected BrokerId[] brokerPath; 035 protected boolean dispatchAsync; 036 protected int windowSize; 037 protected AtomicLong sentCount = new AtomicLong(); 038 039 public ProducerInfo() { 040 } 041 042 public ProducerInfo(ProducerId producerId) { 043 this.producerId = producerId; 044 } 045 046 public ProducerInfo(SessionInfo sessionInfo, long producerId) { 047 this.producerId = new ProducerId(sessionInfo.getSessionId(), producerId); 048 } 049 050 public ProducerInfo copy() { 051 ProducerInfo info = new ProducerInfo(); 052 copy(info); 053 return info; 054 } 055 056 public void copy(ProducerInfo info) { 057 super.copy(info); 058 info.producerId = producerId; 059 info.destination = destination; 060 } 061 062 public byte getDataStructureType() { 063 return DATA_STRUCTURE_TYPE; 064 } 065 066 /** 067 * @openwire:property version=1 cache=true 068 */ 069 public ProducerId getProducerId() { 070 return producerId; 071 } 072 073 public void setProducerId(ProducerId producerId) { 074 this.producerId = producerId; 075 } 076 077 /** 078 * @openwire:property version=1 cache=true 079 */ 080 public ActiveMQDestination getDestination() { 081 return destination; 082 } 083 084 public void setDestination(ActiveMQDestination destination) { 085 this.destination = destination; 086 } 087 088 public RemoveInfo createRemoveCommand() { 089 RemoveInfo command = new RemoveInfo(getProducerId()); 090 command.setResponseRequired(isResponseRequired()); 091 return command; 092 } 093 094 /** 095 * The route of brokers the command has moved through. 096 * 097 * @openwire:property version=1 cache=true 098 */ 099 public BrokerId[] getBrokerPath() { 100 return brokerPath; 101 } 102 103 public void setBrokerPath(BrokerId[] brokerPath) { 104 this.brokerPath = brokerPath; 105 } 106 107 public Response visit(CommandVisitor visitor) throws Exception { 108 return visitor.processAddProducer(this); 109 } 110 111 /** 112 * If the broker should dispatch messages from this producer async. Since 113 * sync dispatch could potentally block the producer thread, this could be 114 * an important setting for the producer. 115 * 116 * @openwire:property version=2 117 */ 118 public boolean isDispatchAsync() { 119 return dispatchAsync; 120 } 121 122 public void setDispatchAsync(boolean dispatchAsync) { 123 this.dispatchAsync = dispatchAsync; 124 } 125 126 /** 127 * Used to configure the producer window size. A producer will send up to 128 * the configured window size worth of payload data to the broker before 129 * waiting for an Ack that allows him to send more. 130 * 131 * @openwire:property version=3 132 */ 133 public int getWindowSize() { 134 return windowSize; 135 } 136 137 public void setWindowSize(int windowSize) { 138 this.windowSize = windowSize; 139 } 140 141 public long getSentCount(){ 142 return sentCount.get(); 143 } 144 145 public void incrementSentCount(){ 146 sentCount.incrementAndGet(); 147 } 148 149 public void resetSentCount(){ 150 sentCount.set(0); 151 } 152 153}