001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.console.command;
018
019import org.apache.activemq.ActiveMQConnectionFactory;
020import org.apache.activemq.command.ActiveMQDestination;
021import org.apache.activemq.util.ConsumerThread;
022import org.slf4j.Logger;
023import org.slf4j.LoggerFactory;
024
025import javax.jms.Connection;
026import javax.jms.Session;
027import java.util.List;
028import java.util.concurrent.CountDownLatch;
029
030public class ConsumerCommand extends AbstractCommand {
031    private static final Logger LOG = LoggerFactory.getLogger(ConsumerCommand.class);
032
033    String brokerUrl = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
034    String user = ActiveMQConnectionFactory.DEFAULT_USER;
035    String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
036    String destination = "queue://TEST";
037    int messageCount = 1000;
038    int sleep;
039    boolean transacted;
040    private boolean durable;
041    private String clientId;
042    int batchSize = 10;
043    int ackMode = Session.AUTO_ACKNOWLEDGE;
044    int parallelThreads = 1;
045    boolean bytesAsText;
046
047    @Override
048    protected void runTask(List<String> tokens) throws Exception {
049        LOG.info("Connecting to URL: " + brokerUrl + " as user: " + user);
050        LOG.info("Consuming " + destination);
051        LOG.info("Sleeping between receives " + sleep + " ms");
052        LOG.info("Running " + parallelThreads + " parallel threads");
053
054        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
055        Connection conn = null;
056        try {
057            conn = factory.createConnection(user, password);
058            if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
059                conn.setClientID(clientId);
060            }
061            conn.start();
062
063            Session sess;
064            if (transacted) {
065                sess = conn.createSession(true, Session.SESSION_TRANSACTED);
066            } else {
067                sess = conn.createSession(false, ackMode);
068            }
069
070
071            CountDownLatch active = new CountDownLatch(parallelThreads);
072
073            for (int i = 1; i <= parallelThreads; i++) {
074                ConsumerThread consumer = new ConsumerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE));
075                consumer.setName("consumer-" + i);
076                consumer.setDurable(durable);
077                consumer.setBreakOnNull(false);
078                consumer.setMessageCount(messageCount);
079                consumer.setSleep(sleep);
080                consumer.setBatchSize(batchSize);
081                consumer.setFinished(active);
082                consumer.setBytesAsText(bytesAsText);
083                consumer.start();
084            }
085
086            active.await();
087        } finally {
088            if (conn != null) {
089                conn.close();
090            }
091        }
092    }
093
094    public String getBrokerUrl() {
095        return brokerUrl;
096    }
097
098    public void setBrokerUrl(String brokerUrl) {
099        this.brokerUrl = brokerUrl;
100    }
101
102    public String getUser() {
103        return user;
104    }
105
106    public void setUser(String user) {
107        this.user = user;
108    }
109
110    public String getPassword() {
111        return password;
112    }
113
114    public void setPassword(String password) {
115        this.password = password;
116    }
117
118    public String getDestination() {
119        return destination;
120    }
121
122    public void setDestination(String destination) {
123        this.destination = destination;
124    }
125
126    public int getMessageCount() {
127        return messageCount;
128    }
129
130    public void setMessageCount(int messageCount) {
131        this.messageCount = messageCount;
132    }
133
134    public int getSleep() {
135        return sleep;
136    }
137
138    public void setSleep(int sleep) {
139        this.sleep = sleep;
140    }
141
142    public int getBatchSize() {
143        return batchSize;
144    }
145
146    public void setBatchSize(int batchSize) {
147        this.batchSize = batchSize;
148    }
149
150    public int getParallelThreads() {
151        return parallelThreads;
152    }
153
154    public void setParallelThreads(int parallelThreads) {
155        this.parallelThreads = parallelThreads;
156    }
157
158    public boolean isBytesAsText() {
159        return bytesAsText;
160    }
161
162    public void setBytesAsText(boolean bytesAsText) {
163        this.bytesAsText = bytesAsText;
164    }
165
166    public boolean isTransacted() {
167        return transacted;
168    }
169
170    public void setTransacted(boolean transacted) {
171        this.transacted = transacted;
172    }
173
174    public int getAckMode() {
175        return ackMode;
176    }
177
178    public void setAckMode(String ackMode) {
179        if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {
180            this.ackMode = Session.CLIENT_ACKNOWLEDGE;
181        }
182        if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
183            this.ackMode = Session.AUTO_ACKNOWLEDGE;
184        }
185        if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
186            this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;
187        }
188    }
189
190    public boolean isDurable() {
191        return durable;
192    }
193
194    public void setDurable(boolean durable) {
195        this.durable = durable;
196    }
197
198    public String getClientId() {
199        return clientId;
200    }
201
202    public void setClientId(String clientId) {
203        this.clientId = clientId;
204    }
205
206    @Override
207    protected void printHelp() {
208        printHelpFromFile();
209    }
210
211    @Override
212    public String getName() {
213        return "consumer";
214    }
215
216    @Override
217    public String getOneLineDescription() {
218        return "Receives messages from the broker";
219    }
220}