001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.util; 018 019import org.slf4j.Logger; 020import org.slf4j.LoggerFactory; 021 022import javax.jms.*; 023import java.util.concurrent.CountDownLatch; 024 025public class ConsumerThread extends Thread { 026 027 private static final Logger LOG = LoggerFactory.getLogger(ConsumerThread.class); 028 029 int messageCount = 1000; 030 int receiveTimeOut = 3000; 031 Destination destination; 032 Session session; 033 boolean durable; 034 boolean breakOnNull = true; 035 int sleep; 036 int batchSize; 037 038 int received = 0; 039 int transactions = 0; 040 boolean running = false; 041 CountDownLatch finished; 042 boolean bytesAsText; 043 044 public ConsumerThread(Session session, Destination destination) { 045 this.destination = destination; 046 this.session = session; 047 } 048 049 @Override 050 public void run() { 051 running = true; 052 MessageConsumer consumer = null; 053 String threadName = Thread.currentThread().getName(); 054 LOG.info(threadName + " wait until " + messageCount + " messages are consumed"); 055 try { 056 if (durable && destination instanceof Topic) { 057 consumer = session.createDurableSubscriber((Topic) destination, getName()); 058 } else { 059 consumer = session.createConsumer(destination); 060 } 061 while (running && received < messageCount) { 062 Message msg = consumer.receive(receiveTimeOut); 063 if (msg != null) { 064 LOG.info(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID())); 065 if (bytesAsText && (msg instanceof BytesMessage)) { 066 long length = ((BytesMessage) msg).getBodyLength(); 067 byte[] bytes = new byte[(int) length]; 068 ((BytesMessage) msg).readBytes(bytes); 069 LOG.info("BytesMessage as text string: " + new String(bytes)); 070 } 071 received++; 072 } else { 073 if (breakOnNull) { 074 break; 075 } 076 } 077 078 if (session.getTransacted()) { 079 if (batchSize > 0 && received > 0 && received % batchSize == 0) { 080 LOG.info(threadName + " Committing transaction: " + transactions++); 081 session.commit(); 082 } 083 } else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { 084 if (batchSize > 0 && received > 0 && received % batchSize == 0) { 085 LOG.info("Acknowledging last " + batchSize + " messages; messages so far = " + received); 086 msg.acknowledge(); 087 } 088 } 089 if (sleep > 0) { 090 Thread.sleep(sleep); 091 } 092 093 } 094 } catch (Exception e) { 095 e.printStackTrace(); 096 } finally { 097 if (finished != null) { 098 finished.countDown(); 099 } 100 if (consumer != null) { 101 LOG.info(threadName + " Consumed: " + this.getReceived() + " messages"); 102 try { 103 consumer.close(); 104 } catch (JMSException e) { 105 e.printStackTrace(); 106 } 107 } 108 } 109 110 LOG.info(threadName + " Consumer thread finished"); 111 } 112 113 public int getReceived() { 114 return received; 115 } 116 117 public boolean isDurable() { 118 return durable; 119 } 120 121 public void setDurable(boolean durable) { 122 this.durable = durable; 123 } 124 125 public void setMessageCount(int messageCount) { 126 this.messageCount = messageCount; 127 } 128 129 public void setBreakOnNull(boolean breakOnNull) { 130 this.breakOnNull = breakOnNull; 131 } 132 133 public int getBatchSize() { 134 return batchSize; 135 } 136 137 public void setBatchSize(int batchSize) { 138 this.batchSize = batchSize; 139 } 140 141 public int getMessageCount() { 142 return messageCount; 143 } 144 145 public boolean isBreakOnNull() { 146 return breakOnNull; 147 } 148 149 public int getReceiveTimeOut() { 150 return receiveTimeOut; 151 } 152 153 public void setReceiveTimeOut(int receiveTimeOut) { 154 this.receiveTimeOut = receiveTimeOut; 155 } 156 157 public boolean isRunning() { 158 return running; 159 } 160 161 public void setRunning(boolean running) { 162 this.running = running; 163 } 164 165 public int getSleep() { 166 return sleep; 167 } 168 169 public void setSleep(int sleep) { 170 this.sleep = sleep; 171 } 172 173 public CountDownLatch getFinished() { 174 return finished; 175 } 176 177 public void setFinished(CountDownLatch finished) { 178 this.finished = finished; 179 } 180 181 public boolean isBytesAsText() { 182 return bytesAsText; 183 } 184 185 public void setBytesAsText(boolean bytesAsText) { 186 this.bytesAsText = bytesAsText; 187 } 188}