001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.console.command;
018
019import org.apache.activemq.ActiveMQConnectionFactory;
020import org.apache.activemq.command.ActiveMQDestination;
021import org.apache.activemq.util.ProducerThread;
022import org.slf4j.Logger;
023import org.slf4j.LoggerFactory;
024
025import javax.jms.Connection;
026import javax.jms.Session;
027import java.util.List;
028import java.util.concurrent.CountDownLatch;
029
030public class ProducerCommand extends AbstractCommand {
031    private static final Logger LOG = LoggerFactory.getLogger(ProducerCommand.class);
032
033    String brokerUrl = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
034    String user = ActiveMQConnectionFactory.DEFAULT_USER;
035    String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
036    String destination = "queue://TEST";
037    int messageCount = 1000;
038    int sleep = 0;
039    boolean persistent = true;
040    String message = null;
041    String payloadUrl = null;
042    int messageSize = 0;
043    int textMessageSize;
044    long msgTTL = 0L;
045    String msgGroupID=null;
046    int transactionBatchSize;
047    private int parallelThreads = 1;
048
049    @Override
050    protected void runTask(List<String> tokens) throws Exception {
051        LOG.info("Connecting to URL: " + brokerUrl + " as user: " + user);
052        LOG.info("Producing messages to " + destination);
053        LOG.info("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
054        LOG.info("Sleeping between sends " + sleep + " ms");
055        LOG.info("Running " + parallelThreads + " parallel threads");
056
057        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
058        Connection conn = null;
059        try {
060            conn = factory.createConnection(user, password);
061            conn.start();
062
063            Session sess;
064            if (transactionBatchSize != 0) {
065                sess = conn.createSession(true, Session.SESSION_TRANSACTED);
066            } else {
067                sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
068            }
069
070            CountDownLatch active = new CountDownLatch(parallelThreads);
071
072            for (int i = 1; i <= parallelThreads; i++) {
073                ProducerThread producer = new ProducerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE));
074                producer.setName("producer-" + i);
075                producer.setMessageCount(messageCount);
076                producer.setSleep(sleep);
077                producer.setMsgTTL(msgTTL);
078                producer.setPersistent(persistent);
079                producer.setTransactionBatchSize(transactionBatchSize);
080                producer.setMessage(message);
081                producer.setPayloadUrl(payloadUrl);
082                producer.setMessageSize(messageSize);
083                producer.setMsgGroupID(msgGroupID);
084                producer.setTextMessageSize(textMessageSize);
085                producer.setFinished(active);
086                producer.start();
087            }
088
089            active.await();
090        } finally {
091            if (conn != null) {
092                conn.close();
093            }
094        }
095    }
096
097    public String getBrokerUrl() {
098        return brokerUrl;
099    }
100
101    public void setBrokerUrl(String brokerUrl) {
102        this.brokerUrl = brokerUrl;
103    }
104
105    public String getDestination() {
106        return destination;
107    }
108
109    public void setDestination(String destination) {
110        this.destination = destination;
111    }
112
113    public int getMessageCount() {
114        return messageCount;
115    }
116
117    public void setMessageCount(int messageCount) {
118        this.messageCount = messageCount;
119    }
120
121    public int getSleep() {
122        return sleep;
123    }
124
125    public void setSleep(int sleep) {
126        this.sleep = sleep;
127    }
128
129    public boolean isPersistent() {
130        return persistent;
131    }
132
133    public void setPersistent(boolean persistent) {
134        this.persistent = persistent;
135    }
136
137    public int getMessageSize() {
138        return messageSize;
139    }
140
141    public void setMessageSize(int messageSize) {
142        this.messageSize = messageSize;
143    }
144
145    public int getTextMessageSize() {
146        return textMessageSize;
147    }
148
149    public void setTextMessageSize(int textMessageSize) {
150        this.textMessageSize = textMessageSize;
151    }
152
153    public long getMsgTTL() {
154        return msgTTL;
155    }
156
157    public void setMsgTTL(long msgTTL) {
158        this.msgTTL = msgTTL;
159    }
160
161    public String getMsgGroupID() {
162        return msgGroupID;
163    }
164
165    public void setMsgGroupID(String msgGroupID) {
166        this.msgGroupID = msgGroupID;
167    }
168
169    public int getTransactionBatchSize() {
170        return transactionBatchSize;
171    }
172
173    public void setTransactionBatchSize(int transactionBatchSize) {
174        this.transactionBatchSize = transactionBatchSize;
175    }
176
177    public String getUser() {
178        return user;
179    }
180
181    public void setUser(String user) {
182        this.user = user;
183    }
184
185    public String getPassword() {
186        return password;
187    }
188
189    public void setPassword(String password) {
190        this.password = password;
191    }
192
193    public int getParallelThreads() {
194        return parallelThreads;
195    }
196
197    public void setParallelThreads(int parallelThreads) {
198        this.parallelThreads = parallelThreads;
199    }
200
201    public String getPayloadUrl() {
202        return payloadUrl;
203    }
204
205    public void setPayloadUrl(String payloadUrl) {
206        this.payloadUrl = payloadUrl;
207    }
208
209    public String getMessage() {
210        return message;
211    }
212
213    public void setMessage(String message) {
214        this.message = message;
215    }
216
217    @Override
218    protected void printHelp() {
219        printHelpFromFile();
220    }
221
222    @Override
223    public String getName() {
224        return "producer";
225    }
226
227    @Override
228    public String getOneLineDescription() {
229        return "Sends messages to the broker";
230    }
231}