001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.console.command; 018 019import org.apache.activemq.ActiveMQConnectionFactory; 020import org.apache.activemq.command.ActiveMQDestination; 021import org.apache.activemq.util.ConsumerThread; 022import org.slf4j.Logger; 023import org.slf4j.LoggerFactory; 024 025import javax.jms.Connection; 026import javax.jms.Session; 027import java.util.List; 028import java.util.concurrent.CountDownLatch; 029 030public class ConsumerCommand extends AbstractCommand { 031 private static final Logger LOG = LoggerFactory.getLogger(ConsumerCommand.class); 032 033 String brokerUrl = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; 034 String user = ActiveMQConnectionFactory.DEFAULT_USER; 035 String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD; 036 String destination = "queue://TEST"; 037 int messageCount = 1000; 038 int sleep; 039 boolean transacted; 040 private boolean durable; 041 private String clientId; 042 int batchSize = 10; 043 int ackMode = Session.AUTO_ACKNOWLEDGE; 044 int parallelThreads = 1; 045 boolean bytesAsText; 046 047 @Override 048 protected void runTask(List<String> tokens) throws Exception { 049 LOG.info("Connecting to URL: " + brokerUrl + " as user: " + user); 050 LOG.info("Consuming " + destination); 051 LOG.info("Sleeping between receives " + sleep + " ms"); 052 LOG.info("Running " + parallelThreads + " parallel threads"); 053 054 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); 055 Connection conn = null; 056 try { 057 conn = factory.createConnection(user, password); 058 if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) { 059 conn.setClientID(clientId); 060 } 061 conn.start(); 062 063 Session sess; 064 if (transacted) { 065 sess = conn.createSession(true, Session.SESSION_TRANSACTED); 066 } else { 067 sess = conn.createSession(false, ackMode); 068 } 069 070 071 CountDownLatch active = new CountDownLatch(parallelThreads); 072 073 for (int i = 1; i <= parallelThreads; i++) { 074 ConsumerThread consumer = new ConsumerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE)); 075 consumer.setName("consumer-" + i); 076 consumer.setDurable(durable); 077 consumer.setBreakOnNull(false); 078 consumer.setMessageCount(messageCount); 079 consumer.setSleep(sleep); 080 consumer.setBatchSize(batchSize); 081 consumer.setFinished(active); 082 consumer.setBytesAsText(bytesAsText); 083 consumer.start(); 084 } 085 086 active.await(); 087 } finally { 088 if (conn != null) { 089 conn.close(); 090 } 091 } 092 } 093 094 public String getBrokerUrl() { 095 return brokerUrl; 096 } 097 098 public void setBrokerUrl(String brokerUrl) { 099 this.brokerUrl = brokerUrl; 100 } 101 102 public String getUser() { 103 return user; 104 } 105 106 public void setUser(String user) { 107 this.user = user; 108 } 109 110 public String getPassword() { 111 return password; 112 } 113 114 public void setPassword(String password) { 115 this.password = password; 116 } 117 118 public String getDestination() { 119 return destination; 120 } 121 122 public void setDestination(String destination) { 123 this.destination = destination; 124 } 125 126 public int getMessageCount() { 127 return messageCount; 128 } 129 130 public void setMessageCount(int messageCount) { 131 this.messageCount = messageCount; 132 } 133 134 public int getSleep() { 135 return sleep; 136 } 137 138 public void setSleep(int sleep) { 139 this.sleep = sleep; 140 } 141 142 public int getBatchSize() { 143 return batchSize; 144 } 145 146 public void setBatchSize(int batchSize) { 147 this.batchSize = batchSize; 148 } 149 150 public int getParallelThreads() { 151 return parallelThreads; 152 } 153 154 public void setParallelThreads(int parallelThreads) { 155 this.parallelThreads = parallelThreads; 156 } 157 158 public boolean isBytesAsText() { 159 return bytesAsText; 160 } 161 162 public void setBytesAsText(boolean bytesAsText) { 163 this.bytesAsText = bytesAsText; 164 } 165 166 public boolean isTransacted() { 167 return transacted; 168 } 169 170 public void setTransacted(boolean transacted) { 171 this.transacted = transacted; 172 } 173 174 public int getAckMode() { 175 return ackMode; 176 } 177 178 public void setAckMode(String ackMode) { 179 if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) { 180 this.ackMode = Session.CLIENT_ACKNOWLEDGE; 181 } 182 if ("AUTO_ACKNOWLEDGE".equals(ackMode)) { 183 this.ackMode = Session.AUTO_ACKNOWLEDGE; 184 } 185 if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) { 186 this.ackMode = Session.DUPS_OK_ACKNOWLEDGE; 187 } 188 } 189 190 public boolean isDurable() { 191 return durable; 192 } 193 194 public void setDurable(boolean durable) { 195 this.durable = durable; 196 } 197 198 public String getClientId() { 199 return clientId; 200 } 201 202 public void setClientId(String clientId) { 203 this.clientId = clientId; 204 } 205 206 @Override 207 protected void printHelp() { 208 printHelpFromFile(); 209 } 210 211 @Override 212 public String getName() { 213 return "consumer"; 214 } 215 216 @Override 217 public String getOneLineDescription() { 218 return "Receives messages from the broker"; 219 } 220}