001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import java.util.Map; 020 021import javax.management.openmbean.CompositeData; 022import javax.management.openmbean.OpenDataException; 023import javax.jms.JMSException; 024 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.broker.region.Queue; 027import org.apache.activemq.broker.region.QueueMessageReference; 028import org.apache.activemq.command.ActiveMQDestination; 029import org.apache.activemq.command.Message; 030import org.apache.activemq.util.BrokerSupport; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * Provides a JMX Management view of a Queue. 036 */ 037public class QueueView extends DestinationView implements QueueViewMBean { 038 private static final Logger LOG = LoggerFactory.getLogger(QueueView.class); 039 040 public QueueView(ManagedRegionBroker broker, Queue destination) { 041 super(broker, destination); 042 } 043 044 public CompositeData getMessage(String messageId) throws OpenDataException { 045 CompositeData result = null; 046 QueueMessageReference ref = ((Queue)destination).getMessage(messageId); 047 048 if (ref != null) { 049 Message rc = ref.getMessage(); 050 if (rc == null) { 051 return null; 052 } 053 result = OpenTypeSupport.convert(rc); 054 } 055 056 return result; 057 } 058 059 public synchronized void purge() throws Exception { 060 final long originalMessageCount = destination.getDestinationStatistics().getMessages().getCount(); 061 062 ((Queue)destination).purge(); 063 064 LOG.info("{} purge of {} messages", destination.getActiveMQDestination().getQualifiedName(), originalMessageCount); 065 } 066 067 public synchronized boolean removeMessage(String messageId) throws Exception { 068 return ((Queue)destination).removeMessage(messageId); 069 } 070 071 public synchronized int removeMatchingMessages(String selector) throws Exception { 072 return ((Queue)destination).removeMatchingMessages(selector); 073 } 074 075 public synchronized int removeMatchingMessages(String selector, int maximumMessages) throws Exception { 076 return ((Queue)destination).removeMatchingMessages(selector, maximumMessages); 077 } 078 079 public synchronized boolean copyMessageTo(String messageId, String destinationName) throws Exception { 080 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 081 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 082 return ((Queue)destination).copyMessageTo(context, messageId, toDestination); 083 } 084 085 public synchronized int copyMatchingMessagesTo(String selector, String destinationName) throws Exception { 086 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 087 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 088 return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination); 089 } 090 091 public synchronized int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception { 092 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 093 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 094 return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination, maximumMessages); 095 } 096 097 public synchronized boolean moveMessageTo(String messageId, String destinationName) throws Exception { 098 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 099 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 100 return ((Queue)destination).moveMessageTo(context, messageId, toDestination); 101 } 102 103 public synchronized int moveMatchingMessagesTo(String selector, String destinationName) throws Exception { 104 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 105 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 106 return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination); 107 } 108 109 public synchronized int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception { 110 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 111 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 112 return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination, maximumMessages); 113 } 114 115 public synchronized int retryMessages() throws Exception { 116 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 117 return ((Queue)destination).retryMessages(context, Integer.MAX_VALUE); 118 } 119 120 /** 121 * Moves a message back to its original destination 122 */ 123 public boolean retryMessage(String messageId) throws Exception { 124 Queue queue = (Queue) destination; 125 QueueMessageReference ref = queue.getMessage(messageId); 126 if (ref == null) { 127 throw new JMSException("Could not find message reference: "+ messageId); 128 } 129 Message rc = ref.getMessage(); 130 if (rc != null) { 131 ActiveMQDestination originalDestination = rc.getOriginalDestination(); 132 if (originalDestination != null) { 133 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 134 return queue.moveMessageTo(context, ref, originalDestination); 135 } 136 else { 137 throw new JMSException("No original destination for message: "+ messageId); 138 } 139 } 140 else { 141 throw new JMSException("Could not find message: "+ messageId); 142 } 143 } 144 145 public int cursorSize() { 146 Queue queue = (Queue) destination; 147 if (queue.getMessages() != null){ 148 return queue.getMessages().size(); 149 } 150 return 0; 151 } 152 153 154 public boolean doesCursorHaveMessagesBuffered() { 155 Queue queue = (Queue) destination; 156 if (queue.getMessages() != null){ 157 return queue.getMessages().hasMessagesBufferedToDeliver(); 158 } 159 return false; 160 161 } 162 163 164 public boolean doesCursorHaveSpace() { 165 Queue queue = (Queue) destination; 166 if (queue.getMessages() != null){ 167 return queue.getMessages().hasSpace(); 168 } 169 return false; 170 } 171 172 173 public long getCursorMemoryUsage() { 174 Queue queue = (Queue) destination; 175 if (queue.getMessages() != null && queue.getMessages().getSystemUsage() != null){ 176 return queue.getMessages().getSystemUsage().getMemoryUsage().getUsage(); 177 } 178 return 0; 179 } 180 181 public int getCursorPercentUsage() { 182 Queue queue = (Queue) destination; 183 if (queue.getMessages() != null && queue.getMessages().getSystemUsage() != null){ 184 return queue.getMessages().getSystemUsage().getMemoryUsage().getPercentUsage(); 185 } 186 return 0; 187 } 188 189 public boolean isCursorFull() { 190 Queue queue = (Queue) destination; 191 if (queue.getMessages() != null){ 192 return queue.getMessages().isFull(); 193 } 194 return false; 195 } 196 197 public boolean isCacheEnabled() { 198 Queue queue = (Queue) destination; 199 if (queue.getMessages() != null){ 200 return queue.getMessages().isCacheEnabled(); 201 } 202 return false; 203 } 204 205 /** 206 * @return a Map of groupNames and ConsumerIds 207 */ 208 @Override 209 public Map<String, String> getMessageGroups() { 210 Queue queue = (Queue) destination; 211 return queue.getMessageGroupOwners().getGroups(); 212 } 213 214 /** 215 * @return the message group type implementation (simple,bucket,cached) 216 */ 217 @Override 218 public String getMessageGroupType() { 219 Queue queue = (Queue) destination; 220 return queue.getMessageGroupOwners().getType(); 221 } 222 223 /** 224 * remove a message group = has the effect of rebalancing group 225 */ 226 @Override 227 public void removeMessageGroup(@MBeanInfo("groupName") String groupName) { 228 Queue queue = (Queue) destination; 229 queue.getMessageGroupOwners().removeGroup(groupName); 230 } 231 232 /** 233 * remove all the message groups - will rebalance all message groups across consumers 234 */ 235 @Override 236 public void removeAllMessageGroups() { 237 Queue queue = (Queue) destination; 238 queue.getMessageGroupOwners().removeAll(); 239 } 240 241 @Override 242 public void pause() { 243 Queue queue = (Queue) destination; 244 queue.pauseDispatch(); 245 } 246 247 @Override 248 public void resume() { 249 Queue queue = (Queue) destination; 250 queue.resumeDispatch(); 251 } 252 253 @Override 254 public boolean isPaused() { 255 Queue queue = (Queue) destination; 256 return queue.isDispatchPaused(); 257 } 258}