001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.util;
018
019import org.slf4j.Logger;
020import org.slf4j.LoggerFactory;
021
022import javax.jms.*;
023import java.util.concurrent.CountDownLatch;
024
025public class ConsumerThread extends Thread {
026
027    private static final Logger LOG = LoggerFactory.getLogger(ConsumerThread.class);
028
029    int messageCount = 1000;
030    int receiveTimeOut = 3000;
031    Destination destination;
032    Session session;
033    boolean durable;
034    boolean breakOnNull = true;
035    int sleep;
036    int batchSize;
037
038    int received = 0;
039    int transactions = 0;
040    boolean running = false;
041    CountDownLatch finished;
042    boolean bytesAsText;
043
044    public ConsumerThread(Session session, Destination destination) {
045        this.destination = destination;
046        this.session = session;
047    }
048
049    @Override
050    public void run() {
051        running = true;
052        MessageConsumer consumer = null;
053        String threadName = Thread.currentThread().getName();
054        LOG.info(threadName + " wait until " + messageCount + " messages are consumed");
055        try {
056            if (durable && destination instanceof Topic) {
057                consumer = session.createDurableSubscriber((Topic) destination, getName());
058            } else {
059                consumer = session.createConsumer(destination);
060            }
061            while (running && received < messageCount) {
062                Message msg = consumer.receive(receiveTimeOut);
063                if (msg != null) {
064                    LOG.info(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
065                    if (bytesAsText && (msg instanceof BytesMessage)) {
066                        long length = ((BytesMessage) msg).getBodyLength();
067                        byte[] bytes = new byte[(int) length];
068                        ((BytesMessage) msg).readBytes(bytes);
069                        LOG.info("BytesMessage as text string: " + new String(bytes));
070                    }
071                    received++;
072                } else {
073                    if (breakOnNull) {
074                        break;
075                    }
076                }
077
078                if (session.getTransacted()) {
079                    if (batchSize > 0 && received > 0 && received % batchSize == 0) {
080                        LOG.info(threadName + " Committing transaction: " + transactions++);
081                        session.commit();
082                    }
083                } else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
084                    if (batchSize > 0 && received > 0 && received % batchSize == 0) {
085                        LOG.info("Acknowledging last " + batchSize + " messages; messages so far = " + received);
086                        msg.acknowledge();
087                    }
088                }
089                if (sleep > 0) {
090                    Thread.sleep(sleep);
091                }
092
093            }
094        } catch (Exception e) {
095            e.printStackTrace();
096        } finally {
097            if (finished != null) {
098                finished.countDown();
099            }
100            if (consumer != null) {
101                LOG.info(threadName + " Consumed: " + this.getReceived() + " messages");
102                try {
103                    consumer.close();
104                } catch (JMSException e) {
105                    e.printStackTrace();
106                }
107            }
108        }
109
110        LOG.info(threadName + " Consumer thread finished");
111    }
112
113    public int getReceived() {
114        return received;
115    }
116
117    public boolean isDurable() {
118        return durable;
119    }
120
121    public void setDurable(boolean durable) {
122        this.durable = durable;
123    }
124
125    public void setMessageCount(int messageCount) {
126        this.messageCount = messageCount;
127    }
128
129    public void setBreakOnNull(boolean breakOnNull) {
130        this.breakOnNull = breakOnNull;
131    }
132
133    public int getBatchSize() {
134        return batchSize;
135    }
136
137    public void setBatchSize(int batchSize) {
138        this.batchSize = batchSize;
139    }
140
141    public int getMessageCount() {
142        return messageCount;
143    }
144
145    public boolean isBreakOnNull() {
146        return breakOnNull;
147    }
148
149    public int getReceiveTimeOut() {
150        return receiveTimeOut;
151    }
152
153    public void setReceiveTimeOut(int receiveTimeOut) {
154        this.receiveTimeOut = receiveTimeOut;
155    }
156
157    public boolean isRunning() {
158        return running;
159    }
160
161    public void setRunning(boolean running) {
162        this.running = running;
163    }
164
165    public int getSleep() {
166        return sleep;
167    }
168
169    public void setSleep(int sleep) {
170        this.sleep = sleep;
171    }
172
173    public CountDownLatch getFinished() {
174        return finished;
175    }
176
177    public void setFinished(CountDownLatch finished) {
178        this.finished = finished;
179    }
180
181    public boolean isBytesAsText() {
182        return bytesAsText;
183    }
184
185    public void setBytesAsText(boolean bytesAsText) {
186        this.bytesAsText = bytesAsText;
187    }
188}