001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.util.Map;
020
021import javax.management.openmbean.CompositeData;
022import javax.management.openmbean.OpenDataException;
023import javax.jms.JMSException;
024
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.region.Queue;
027import org.apache.activemq.broker.region.QueueMessageReference;
028import org.apache.activemq.command.ActiveMQDestination;
029import org.apache.activemq.command.Message;
030import org.apache.activemq.util.BrokerSupport;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Provides a JMX Management view of a Queue.
036 */
037public class QueueView extends DestinationView implements QueueViewMBean {
038    private static final Logger LOG = LoggerFactory.getLogger(QueueView.class);
039
040    public QueueView(ManagedRegionBroker broker, Queue destination) {
041        super(broker, destination);
042    }
043
044    public CompositeData getMessage(String messageId) throws OpenDataException {
045        CompositeData result = null;
046        QueueMessageReference ref = ((Queue)destination).getMessage(messageId);
047
048        if (ref != null) {
049                Message rc = ref.getMessage();
050                if (rc == null) {
051                    return null;
052                }
053                result = OpenTypeSupport.convert(rc);
054        }
055
056        return result;
057    }
058
059    public synchronized void purge() throws Exception {
060        final long originalMessageCount = destination.getDestinationStatistics().getMessages().getCount();
061
062        ((Queue)destination).purge();
063
064        LOG.info("{} purge of {} messages", destination.getActiveMQDestination().getQualifiedName(), originalMessageCount);
065    }
066
067    public synchronized boolean removeMessage(String messageId) throws Exception {
068        return ((Queue)destination).removeMessage(messageId);
069    }
070
071    public synchronized int removeMatchingMessages(String selector) throws Exception {
072        return ((Queue)destination).removeMatchingMessages(selector);
073    }
074
075    public synchronized int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
076        return ((Queue)destination).removeMatchingMessages(selector, maximumMessages);
077    }
078
079    public synchronized boolean copyMessageTo(String messageId, String destinationName) throws Exception {
080        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
081        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
082        return ((Queue)destination).copyMessageTo(context, messageId, toDestination);
083    }
084
085    public synchronized int copyMatchingMessagesTo(String selector, String destinationName) throws Exception {
086        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
087        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
088        return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination);
089    }
090
091    public synchronized int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
092        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
093        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
094        return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination, maximumMessages);
095    }
096
097    public synchronized boolean moveMessageTo(String messageId, String destinationName) throws Exception {
098        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
099        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
100        return ((Queue)destination).moveMessageTo(context, messageId, toDestination);
101    }
102
103    public synchronized int moveMatchingMessagesTo(String selector, String destinationName) throws Exception {
104        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
105        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
106        return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination);
107    }
108
109    public synchronized int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
110        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
111        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
112        return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination, maximumMessages);
113    }
114
115    public synchronized int retryMessages() throws Exception {
116        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
117        return ((Queue)destination).retryMessages(context, Integer.MAX_VALUE);
118    }
119
120    /**
121     * Moves a message back to its original destination
122     */
123    public boolean retryMessage(String messageId) throws Exception {
124        Queue queue = (Queue) destination;
125        QueueMessageReference ref = queue.getMessage(messageId);
126        if (ref == null) {
127            throw new JMSException("Could not find message reference: "+ messageId);
128        }
129        Message rc = ref.getMessage();
130        if (rc != null) {
131            ActiveMQDestination originalDestination = rc.getOriginalDestination();
132            if (originalDestination != null) {
133                ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
134                return queue.moveMessageTo(context, ref, originalDestination);
135            }
136            else {
137                throw new JMSException("No original destination for message: "+ messageId);
138            }
139        }
140        else {
141            throw new JMSException("Could not find message: "+ messageId);
142        }
143    }
144
145    public int cursorSize() {
146        Queue queue = (Queue) destination;
147        if (queue.getMessages() != null){
148            return queue.getMessages().size();
149        }
150        return 0;
151    }
152
153
154    public boolean doesCursorHaveMessagesBuffered() {
155       Queue queue = (Queue) destination;
156       if (queue.getMessages() != null){
157           return queue.getMessages().hasMessagesBufferedToDeliver();
158       }
159       return false;
160
161    }
162
163
164    public boolean doesCursorHaveSpace() {
165        Queue queue = (Queue) destination;
166        if (queue.getMessages() != null){
167            return queue.getMessages().hasSpace();
168        }
169        return false;
170    }
171
172
173    public long getCursorMemoryUsage() {
174        Queue queue = (Queue) destination;
175        if (queue.getMessages() != null &&  queue.getMessages().getSystemUsage() != null){
176            return queue.getMessages().getSystemUsage().getMemoryUsage().getUsage();
177        }
178        return 0;
179    }
180
181    public int getCursorPercentUsage() {
182        Queue queue = (Queue) destination;
183        if (queue.getMessages() != null &&  queue.getMessages().getSystemUsage() != null){
184            return queue.getMessages().getSystemUsage().getMemoryUsage().getPercentUsage();
185        }
186        return 0;
187    }
188
189    public boolean isCursorFull() {
190        Queue queue = (Queue) destination;
191        if (queue.getMessages() != null){
192            return queue.getMessages().isFull();
193        }
194        return false;
195    }
196
197    public boolean isCacheEnabled() {
198        Queue queue = (Queue) destination;
199        if (queue.getMessages() != null){
200            return queue.getMessages().isCacheEnabled();
201        }
202        return false;
203    }
204
205    /**
206     * @return a Map of groupNames and ConsumerIds
207     */
208    @Override
209    public Map<String, String> getMessageGroups() {
210        Queue queue = (Queue) destination;
211        return queue.getMessageGroupOwners().getGroups();
212    }
213
214    /**
215     * @return the message group type implementation (simple,bucket,cached)
216     */
217    @Override
218    public String getMessageGroupType() {
219        Queue queue = (Queue) destination;
220        return queue.getMessageGroupOwners().getType();
221    }
222
223    /**
224     * remove a message group = has the effect of rebalancing group
225     */
226    @Override
227    public void removeMessageGroup(@MBeanInfo("groupName") String groupName) {
228        Queue queue = (Queue) destination;
229        queue.getMessageGroupOwners().removeGroup(groupName);
230    }
231
232    /**
233     * remove all the message groups - will rebalance all message groups across consumers
234     */
235    @Override
236    public void removeAllMessageGroups() {
237        Queue queue = (Queue) destination;
238        queue.getMessageGroupOwners().removeAll();
239    }
240
241    @Override
242    public void pause() {
243        Queue queue = (Queue) destination;
244        queue.pauseDispatch();
245    }
246
247    @Override
248    public void resume() {
249        Queue queue = (Queue) destination;
250        queue.resumeDispatch();
251    }
252
253    @Override
254    public boolean isPaused() {
255        Queue queue = (Queue) destination;
256        return queue.isDispatchPaused();
257    }
258}