001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.util; 018 019import org.slf4j.Logger; 020import org.slf4j.LoggerFactory; 021 022import javax.jms.*; 023import java.io.*; 024import java.net.URL; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.atomic.AtomicInteger; 027 028public class ProducerThread extends Thread { 029 030 private static final Logger LOG = LoggerFactory.getLogger(ProducerThread.class); 031 032 int messageCount = 1000; 033 boolean runIndefinitely = false; 034 Destination destination; 035 protected Session session; 036 int sleep = 0; 037 boolean persistent = true; 038 int messageSize = 0; 039 int textMessageSize; 040 long msgTTL = 0L; 041 String msgGroupID=null; 042 int transactionBatchSize; 043 044 int transactions = 0; 045 AtomicInteger sentCount = new AtomicInteger(0); 046 String message; 047 String messageText = null; 048 String payloadUrl = null; 049 byte[] payload = null; 050 boolean running = false; 051 CountDownLatch finished; 052 CountDownLatch paused = new CountDownLatch(0); 053 054 055 public ProducerThread(Session session, Destination destination) { 056 this.destination = destination; 057 this.session = session; 058 } 059 060 public void run() { 061 MessageProducer producer = null; 062 String threadName = Thread.currentThread().getName(); 063 try { 064 producer = session.createProducer(destination); 065 producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); 066 producer.setTimeToLive(msgTTL); 067 initPayLoad(); 068 running = true; 069 070 LOG.info(threadName + " Started to calculate elapsed time ...\n"); 071 long tStart = System.currentTimeMillis(); 072 073 if (runIndefinitely) { 074 while (running) { 075 synchronized (this) { 076 paused.await(); 077 } 078 sendMessage(producer, threadName); 079 sentCount.incrementAndGet(); 080 } 081 }else{ 082 for (sentCount.set(0); sentCount.get() < messageCount && running; sentCount.incrementAndGet()) { 083 synchronized (this) { 084 paused.await(); 085 } 086 sendMessage(producer, threadName); 087 } 088 } 089 090 LOG.info(threadName + " Produced: " + this.getSentCount() + " messages"); 091 long tEnd = System.currentTimeMillis(); 092 long elapsed = (tEnd - tStart) / 1000; 093 LOG.info(threadName + " Elapsed time in second : " + elapsed + " s"); 094 LOG.info(threadName + " Elapsed time in milli second : " + (tEnd - tStart) + " milli seconds"); 095 096 } catch (Exception e) { 097 e.printStackTrace(); 098 } finally { 099 if (finished != null) { 100 finished.countDown(); 101 } 102 if (producer != null) { 103 try { 104 producer.close(); 105 } catch (JMSException e) { 106 e.printStackTrace(); 107 } 108 } 109 } 110 } 111 112 private void sendMessage(MessageProducer producer, String threadName) throws Exception { 113 Message message = createMessage(sentCount.get()); 114 producer.send(message); 115 if (LOG.isDebugEnabled()) { 116 LOG.debug(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID())); 117 } 118 119 if (transactionBatchSize > 0 && sentCount.get() > 0 && sentCount.get() % transactionBatchSize == 0) { 120 LOG.info(threadName + " Committing transaction: " + transactions++); 121 session.commit(); 122 } 123 124 if (sleep > 0) { 125 Thread.sleep(sleep); 126 } 127 } 128 129 private void initPayLoad() { 130 if (messageSize > 0) { 131 payload = new byte[messageSize]; 132 for (int i = 0; i < payload.length; i++) { 133 payload[i] = '.'; 134 } 135 } 136 } 137 138 protected Message createMessage(int i) throws Exception { 139 Message answer; 140 if (payload != null) { 141 answer = session.createBytesMessage(); 142 ((BytesMessage) answer).writeBytes(payload); 143 } else { 144 if (textMessageSize > 0) { 145 if (messageText == null) { 146 messageText = readInputStream(getClass().getResourceAsStream("demo.txt"), textMessageSize, i); 147 } 148 } else if (payloadUrl != null) { 149 messageText = readInputStream(new URL(payloadUrl).openStream(), -1, i); 150 } else if (message != null) { 151 messageText = message; 152 } else { 153 messageText = createDefaultMessage(i); 154 } 155 answer = session.createTextMessage(messageText); 156 } 157 if ((msgGroupID != null) && (!msgGroupID.isEmpty())) { 158 answer.setStringProperty("JMSXGroupID", msgGroupID); 159 } 160 return answer; 161 } 162 163 private String readInputStream(InputStream is, int size, int messageNumber) throws IOException { 164 InputStreamReader reader = new InputStreamReader(is); 165 try { 166 char[] buffer; 167 if (size > 0) { 168 buffer = new char[size]; 169 } else { 170 buffer = new char[1024]; 171 } 172 int count; 173 StringBuilder builder = new StringBuilder(); 174 while ((count = reader.read(buffer)) != -1) { 175 builder.append(buffer, 0, count); 176 if (size > 0) break; 177 } 178 return builder.toString(); 179 } catch (IOException ioe) { 180 return createDefaultMessage(messageNumber); 181 } finally { 182 reader.close(); 183 } 184 } 185 186 private String createDefaultMessage(int messageNumber) { 187 return "test message: " + messageNumber; 188 } 189 190 public void setMessageCount(int messageCount) { 191 this.messageCount = messageCount; 192 } 193 194 public int getSleep() { 195 return sleep; 196 } 197 198 public void setSleep(int sleep) { 199 this.sleep = sleep; 200 } 201 202 public int getMessageCount() { 203 return messageCount; 204 } 205 206 public int getSentCount() { 207 return sentCount.get(); 208 } 209 210 public boolean isPersistent() { 211 return persistent; 212 } 213 214 public void setPersistent(boolean persistent) { 215 this.persistent = persistent; 216 } 217 218 public boolean isRunning() { 219 return running; 220 } 221 222 public void setRunning(boolean running) { 223 this.running = running; 224 } 225 226 public long getMsgTTL() { 227 return msgTTL; 228 } 229 230 public void setMsgTTL(long msgTTL) { 231 this.msgTTL = msgTTL; 232 } 233 234 public int getTransactionBatchSize() { 235 return transactionBatchSize; 236 } 237 238 public void setTransactionBatchSize(int transactionBatchSize) { 239 this.transactionBatchSize = transactionBatchSize; 240 } 241 242 public String getMsgGroupID() { 243 return msgGroupID; 244 } 245 246 public void setMsgGroupID(String msgGroupID) { 247 this.msgGroupID = msgGroupID; 248 } 249 250 public int getTextMessageSize() { 251 return textMessageSize; 252 } 253 254 public void setTextMessageSize(int textMessageSize) { 255 this.textMessageSize = textMessageSize; 256 } 257 258 public int getMessageSize() { 259 return messageSize; 260 } 261 262 public void setMessageSize(int messageSize) { 263 this.messageSize = messageSize; 264 } 265 266 public CountDownLatch getFinished() { 267 return finished; 268 } 269 270 public void setFinished(CountDownLatch finished) { 271 this.finished = finished; 272 } 273 274 public String getPayloadUrl() { 275 return payloadUrl; 276 } 277 278 public void setPayloadUrl(String payloadUrl) { 279 this.payloadUrl = payloadUrl; 280 } 281 282 public String getMessage() { 283 return message; 284 } 285 286 public void setMessage(String message) { 287 this.message = message; 288 } 289 290 public boolean isRunIndefinitely() { 291 return runIndefinitely; 292 } 293 294 public void setRunIndefinitely(boolean runIndefinitely) { 295 this.runIndefinitely = runIndefinitely; 296 } 297 298 public synchronized void pauseProducer(){ 299 this.paused = new CountDownLatch(1); 300 } 301 302 public synchronized void resumeProducer(){ 303 this.paused.countDown(); 304 } 305 306 public void resetCounters(){ 307 this.sentCount.set(0); 308 } 309}