001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.console.command; 018 019import org.apache.activemq.ActiveMQConnectionFactory; 020import org.apache.activemq.command.ActiveMQDestination; 021import org.apache.activemq.util.ProducerThread; 022import org.slf4j.Logger; 023import org.slf4j.LoggerFactory; 024 025import javax.jms.Connection; 026import javax.jms.Session; 027import java.util.List; 028import java.util.concurrent.CountDownLatch; 029 030public class ProducerCommand extends AbstractCommand { 031 private static final Logger LOG = LoggerFactory.getLogger(ProducerCommand.class); 032 033 String brokerUrl = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; 034 String user = ActiveMQConnectionFactory.DEFAULT_USER; 035 String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD; 036 String destination = "queue://TEST"; 037 int messageCount = 1000; 038 int sleep = 0; 039 boolean persistent = true; 040 String message = null; 041 String payloadUrl = null; 042 int messageSize = 0; 043 int textMessageSize; 044 long msgTTL = 0L; 045 String msgGroupID=null; 046 int transactionBatchSize; 047 private int parallelThreads = 1; 048 049 @Override 050 protected void runTask(List<String> tokens) throws Exception { 051 LOG.info("Connecting to URL: " + brokerUrl + " as user: " + user); 052 LOG.info("Producing messages to " + destination); 053 LOG.info("Using " + (persistent ? "persistent" : "non-persistent") + " messages"); 054 LOG.info("Sleeping between sends " + sleep + " ms"); 055 LOG.info("Running " + parallelThreads + " parallel threads"); 056 057 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); 058 Connection conn = null; 059 try { 060 conn = factory.createConnection(user, password); 061 conn.start(); 062 063 Session sess; 064 if (transactionBatchSize != 0) { 065 sess = conn.createSession(true, Session.SESSION_TRANSACTED); 066 } else { 067 sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 068 } 069 070 CountDownLatch active = new CountDownLatch(parallelThreads); 071 072 for (int i = 1; i <= parallelThreads; i++) { 073 ProducerThread producer = new ProducerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE)); 074 producer.setName("producer-" + i); 075 producer.setMessageCount(messageCount); 076 producer.setSleep(sleep); 077 producer.setMsgTTL(msgTTL); 078 producer.setPersistent(persistent); 079 producer.setTransactionBatchSize(transactionBatchSize); 080 producer.setMessage(message); 081 producer.setPayloadUrl(payloadUrl); 082 producer.setMessageSize(messageSize); 083 producer.setMsgGroupID(msgGroupID); 084 producer.setTextMessageSize(textMessageSize); 085 producer.setFinished(active); 086 producer.start(); 087 } 088 089 active.await(); 090 } finally { 091 if (conn != null) { 092 conn.close(); 093 } 094 } 095 } 096 097 public String getBrokerUrl() { 098 return brokerUrl; 099 } 100 101 public void setBrokerUrl(String brokerUrl) { 102 this.brokerUrl = brokerUrl; 103 } 104 105 public String getDestination() { 106 return destination; 107 } 108 109 public void setDestination(String destination) { 110 this.destination = destination; 111 } 112 113 public int getMessageCount() { 114 return messageCount; 115 } 116 117 public void setMessageCount(int messageCount) { 118 this.messageCount = messageCount; 119 } 120 121 public int getSleep() { 122 return sleep; 123 } 124 125 public void setSleep(int sleep) { 126 this.sleep = sleep; 127 } 128 129 public boolean isPersistent() { 130 return persistent; 131 } 132 133 public void setPersistent(boolean persistent) { 134 this.persistent = persistent; 135 } 136 137 public int getMessageSize() { 138 return messageSize; 139 } 140 141 public void setMessageSize(int messageSize) { 142 this.messageSize = messageSize; 143 } 144 145 public int getTextMessageSize() { 146 return textMessageSize; 147 } 148 149 public void setTextMessageSize(int textMessageSize) { 150 this.textMessageSize = textMessageSize; 151 } 152 153 public long getMsgTTL() { 154 return msgTTL; 155 } 156 157 public void setMsgTTL(long msgTTL) { 158 this.msgTTL = msgTTL; 159 } 160 161 public String getMsgGroupID() { 162 return msgGroupID; 163 } 164 165 public void setMsgGroupID(String msgGroupID) { 166 this.msgGroupID = msgGroupID; 167 } 168 169 public int getTransactionBatchSize() { 170 return transactionBatchSize; 171 } 172 173 public void setTransactionBatchSize(int transactionBatchSize) { 174 this.transactionBatchSize = transactionBatchSize; 175 } 176 177 public String getUser() { 178 return user; 179 } 180 181 public void setUser(String user) { 182 this.user = user; 183 } 184 185 public String getPassword() { 186 return password; 187 } 188 189 public void setPassword(String password) { 190 this.password = password; 191 } 192 193 public int getParallelThreads() { 194 return parallelThreads; 195 } 196 197 public void setParallelThreads(int parallelThreads) { 198 this.parallelThreads = parallelThreads; 199 } 200 201 public String getPayloadUrl() { 202 return payloadUrl; 203 } 204 205 public void setPayloadUrl(String payloadUrl) { 206 this.payloadUrl = payloadUrl; 207 } 208 209 public String getMessage() { 210 return message; 211 } 212 213 public void setMessage(String message) { 214 this.message = message; 215 } 216 217 @Override 218 protected void printHelp() { 219 printHelpFromFile(); 220 } 221 222 @Override 223 public String getName() { 224 return "producer"; 225 } 226 227 @Override 228 public String getOneLineDescription() { 229 return "Sends messages to the broker"; 230 } 231}