001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.util;
018
019import org.slf4j.Logger;
020import org.slf4j.LoggerFactory;
021
022import javax.jms.*;
023import java.io.*;
024import java.net.URL;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.atomic.AtomicInteger;
027
028public class ProducerThread extends Thread {
029
030    private static final Logger LOG = LoggerFactory.getLogger(ProducerThread.class);
031
032    int messageCount = 1000;
033    boolean runIndefinitely = false;
034    Destination destination;
035    protected Session session;
036    int sleep = 0;
037    boolean persistent = true;
038    int messageSize = 0;
039    int textMessageSize;
040    long msgTTL = 0L;
041    String msgGroupID=null;
042    int transactionBatchSize;
043
044    int transactions = 0;
045    AtomicInteger sentCount = new AtomicInteger(0);
046    String message;
047    String messageText = null;
048    String payloadUrl = null;
049    byte[] payload = null;
050    boolean running = false;
051    CountDownLatch finished;
052    CountDownLatch paused = new CountDownLatch(0);
053
054
055    public ProducerThread(Session session, Destination destination) {
056        this.destination = destination;
057        this.session = session;
058    }
059
060    public void run() {
061        MessageProducer producer = null;
062        String threadName = Thread.currentThread().getName();
063        try {
064            producer = session.createProducer(destination);
065            producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
066            producer.setTimeToLive(msgTTL);
067            initPayLoad();
068            running = true;
069
070            LOG.info(threadName +  " Started to calculate elapsed time ...\n");
071            long tStart = System.currentTimeMillis();
072
073            if (runIndefinitely) {
074                while (running) {
075                    synchronized (this) {
076                        paused.await();
077                    }
078                    sendMessage(producer, threadName);
079                    sentCount.incrementAndGet();
080                }
081            }else{
082                for (sentCount.set(0); sentCount.get() < messageCount && running; sentCount.incrementAndGet()) {
083                    synchronized (this) {
084                        paused.await();
085                    }
086                    sendMessage(producer, threadName);
087                }
088            }
089
090            LOG.info(threadName + " Produced: " + this.getSentCount() + " messages");
091            long tEnd = System.currentTimeMillis();
092            long elapsed = (tEnd - tStart) / 1000;
093            LOG.info(threadName + " Elapsed time in second : " + elapsed + " s");
094            LOG.info(threadName + " Elapsed time in milli second : " + (tEnd - tStart) + " milli seconds");
095
096        } catch (Exception e) {
097            e.printStackTrace();
098        } finally {
099            if (finished != null) {
100                finished.countDown();
101            }
102            if (producer != null) {
103                try {
104                    producer.close();
105                } catch (JMSException e) {
106                    e.printStackTrace();
107                }
108            }
109        }
110    }
111
112    private void sendMessage(MessageProducer producer, String threadName) throws Exception {
113        Message message = createMessage(sentCount.get());
114        producer.send(message);
115        if (LOG.isDebugEnabled()) {
116            LOG.debug(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID()));
117        }
118
119        if (transactionBatchSize > 0 && sentCount.get() > 0 && sentCount.get() % transactionBatchSize == 0) {
120            LOG.info(threadName + " Committing transaction: " + transactions++);
121            session.commit();
122        }
123
124        if (sleep > 0) {
125            Thread.sleep(sleep);
126        }
127    }
128
129    private void initPayLoad() {
130        if (messageSize > 0) {
131            payload = new byte[messageSize];
132            for (int i = 0; i < payload.length; i++) {
133                payload[i] = '.';
134            }
135        }
136    }
137
138    protected Message createMessage(int i) throws Exception {
139        Message answer;
140        if (payload != null) {
141            answer = session.createBytesMessage();
142            ((BytesMessage) answer).writeBytes(payload);
143        } else {
144            if (textMessageSize > 0) {
145                if (messageText == null) {
146                    messageText = readInputStream(getClass().getResourceAsStream("demo.txt"), textMessageSize, i);
147                }
148            } else if (payloadUrl != null) {
149                messageText = readInputStream(new URL(payloadUrl).openStream(), -1, i);
150            } else if (message != null) {
151                messageText = message;
152            } else {
153                messageText = createDefaultMessage(i);
154            }
155            answer = session.createTextMessage(messageText);
156        }
157        if ((msgGroupID != null) && (!msgGroupID.isEmpty())) {
158            answer.setStringProperty("JMSXGroupID", msgGroupID);
159        }
160        return answer;
161    }
162
163    private String readInputStream(InputStream is, int size, int messageNumber) throws IOException {
164        InputStreamReader reader = new InputStreamReader(is);
165        try {
166            char[] buffer;
167            if (size > 0) {
168                buffer = new char[size];
169            } else {
170                buffer = new char[1024];
171            }
172            int count;
173            StringBuilder builder = new StringBuilder();
174            while ((count = reader.read(buffer)) != -1) {
175                builder.append(buffer, 0, count);
176                if (size > 0) break;
177            }
178            return builder.toString();
179        } catch (IOException ioe) {
180            return createDefaultMessage(messageNumber);
181        } finally {
182            reader.close();
183        }
184    }
185
186    private String createDefaultMessage(int messageNumber) {
187        return "test message: " + messageNumber;
188    }
189
190    public void setMessageCount(int messageCount) {
191        this.messageCount = messageCount;
192    }
193
194    public int getSleep() {
195        return sleep;
196    }
197
198    public void setSleep(int sleep) {
199        this.sleep = sleep;
200    }
201
202    public int getMessageCount() {
203        return messageCount;
204    }
205
206    public int getSentCount() {
207        return sentCount.get();
208    }
209
210    public boolean isPersistent() {
211        return persistent;
212    }
213
214    public void setPersistent(boolean persistent) {
215        this.persistent = persistent;
216    }
217
218    public boolean isRunning() {
219        return running;
220    }
221
222    public void setRunning(boolean running) {
223        this.running = running;
224    }
225
226    public long getMsgTTL() {
227        return msgTTL;
228    }
229
230    public void setMsgTTL(long msgTTL) {
231        this.msgTTL = msgTTL;
232    }
233
234    public int getTransactionBatchSize() {
235        return transactionBatchSize;
236    }
237
238    public void setTransactionBatchSize(int transactionBatchSize) {
239        this.transactionBatchSize = transactionBatchSize;
240    }
241
242    public String getMsgGroupID() {
243        return msgGroupID;
244    }
245
246    public void setMsgGroupID(String msgGroupID) {
247        this.msgGroupID = msgGroupID;
248    }
249
250    public int getTextMessageSize() {
251        return textMessageSize;
252    }
253
254    public void setTextMessageSize(int textMessageSize) {
255        this.textMessageSize = textMessageSize;
256    }
257
258    public int getMessageSize() {
259        return messageSize;
260    }
261
262    public void setMessageSize(int messageSize) {
263        this.messageSize = messageSize;
264    }
265
266    public CountDownLatch getFinished() {
267        return finished;
268    }
269
270    public void setFinished(CountDownLatch finished) {
271        this.finished = finished;
272    }
273
274    public String getPayloadUrl() {
275        return payloadUrl;
276    }
277
278    public void setPayloadUrl(String payloadUrl) {
279        this.payloadUrl = payloadUrl;
280    }
281
282    public String getMessage() {
283        return message;
284    }
285
286    public void setMessage(String message) {
287        this.message = message;
288    }
289
290    public boolean isRunIndefinitely() {
291        return runIndefinitely;
292    }
293
294    public void setRunIndefinitely(boolean runIndefinitely) {
295        this.runIndefinitely = runIndefinitely;
296    }
297
298    public synchronized void pauseProducer(){
299        this.paused = new CountDownLatch(1);
300    }
301
302    public synchronized void resumeProducer(){
303        this.paused.countDown();
304    }
305
306    public void resetCounters(){
307        this.sentCount.set(0);
308    }
309}