001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.console.command; 018 019import java.net.URI; 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.List; 023import java.util.StringTokenizer; 024 025import javax.jms.Destination; 026import javax.jms.Message; 027import javax.management.MBeanServerConnection; 028import javax.management.MBeanServerInvocationHandler; 029import javax.management.ObjectInstance; 030import javax.management.ObjectName; 031import javax.management.openmbean.CompositeData; 032import javax.management.remote.JMXConnector; 033 034import org.apache.activemq.broker.jmx.QueueViewMBean; 035import org.apache.activemq.command.ActiveMQQueue; 036import org.apache.activemq.console.util.AmqMessagesUtil; 037import org.apache.activemq.console.util.JmxMBeansUtil; 038 039public class PurgeCommand extends AbstractJmxCommand { 040 041 protected String[] helpFile = new String[] { 042 "Task Usage: Main purge [browse-options] <destinations>", 043 "Description: Delete selected destination's messages that matches the message selector.", 044 "", 045 "Purge Options:", 046 " --msgsel <msgsel1,msglsel2> Add to the search list messages matched by the query similar to", 047 " the messages selector format.", 048 " --jmxurl <url> Set the JMX URL to connect to.", 049 " --pid <pid> Set the pid to connect to (only on Sun JVM).", 050 " --jmxuser <user> Set the JMX user used for authenticating.", 051 " --jmxpassword <password> Set the JMX password used for authenticating.", 052 " --jmxlocal Use the local JMX server instead of a remote one.", 053 " --version Display the version information.", 054 " -h,-?,--help Display the browse broker help information.", 055 "", 056 "Examples:", 057 " Main purge FOO.BAR", 058 " - Delete all the messages in queue FOO.BAR", 059 060 " Main purge --msgsel \"JMSMessageID='*:10',JMSPriority>5\" FOO.*", 061 " - Delete all the messages in the destinations that matches FOO.* and has a JMSMessageID in", 062 " the header field that matches the wildcard *:10, and has a JMSPriority field > 5 in the", 063 " queue FOO.BAR.", 064 " SLQ92 syntax is also supported.", 065 " * To use wildcard queries, the field must be a string and the query enclosed in ''", 066 " Use double quotes \"\" around the entire message selector string.", 067 "" 068 }; 069 070 private final List<String> queryAddObjects = new ArrayList<String>(10); 071 private final List<String> querySubObjects = new ArrayList<String>(10); 072 073 @Override 074 public String getName() { 075 return "purge"; 076 } 077 078 @Override 079 public String getOneLineDescription() { 080 return "Delete selected destination's messages that matches the message selector"; 081 } 082 083 /** 084 * Execute the purge command, which allows you to purge the messages in a 085 * given JMS destination 086 * 087 * @param tokens - command arguments 088 * @throws Exception 089 */ 090 protected void runTask(List<String> tokens) throws Exception { 091 try { 092 // If there is no queue name specified, let's select all 093 if (tokens.isEmpty()) { 094 tokens.add("*"); 095 } 096 097 // Iterate through the queue names 098 for (Iterator<String> i = tokens.iterator(); i.hasNext();) { 099 List queueList = JmxMBeansUtil.queryMBeans(createJmxConnection(), "type=Broker,brokerName=*,destinationType=Queue,destinationName=" + i.next()); 100 101 for (Iterator j = queueList.iterator(); j.hasNext();) { 102 ObjectName queueName = ((ObjectInstance)j.next()).getObjectName(); 103 if (queryAddObjects.isEmpty()) { 104 purgeQueue(queueName); 105 } else { 106 107 QueueViewMBean proxy = (QueueViewMBean) MBeanServerInvocationHandler. 108 newProxyInstance(createJmxConnection(), 109 queueName, 110 QueueViewMBean.class, 111 true); 112 int removed = 0; 113 114 // AMQ-3404: We support two syntaxes for the message 115 // selector query: 116 // 1) AMQ specific: 117 // "JMSPriority>2,MyHeader='Foo'" 118 // 119 // 2) SQL-92 syntax: 120 // "(JMSPriority>2) AND (MyHeader='Foo')" 121 // 122 // If syntax style 1) is used, the comma separated 123 // criterias are broken into List<String> elements. 124 // We then need to construct the SQL-92 query out of 125 // this list. 126 127 String sqlQuery = null; 128 if (queryAddObjects.size() > 1) { 129 sqlQuery = convertToSQL92(queryAddObjects); 130 } else { 131 sqlQuery = queryAddObjects.get(0); 132 } 133 removed = proxy.removeMatchingMessages(sqlQuery); 134 context.printInfo("Removed: " + removed 135 + " messages for message selector " + sqlQuery.toString()); 136 } 137 } 138 } 139 } catch (Exception e) { 140 context.printException(new RuntimeException("Failed to execute purge task. Reason: " + e)); 141 throw new Exception(e); 142 } 143 } 144 145 146 /** 147 * Purge all the messages in the queue 148 * 149 * @param queue - ObjectName of the queue to purge 150 * @throws Exception 151 */ 152 public void purgeQueue(ObjectName queue) throws Exception { 153 context.printInfo("Purging all messages in queue: " + queue.getKeyProperty("destinationName")); 154 createJmxConnection().invoke(queue, "purge", new Object[] {}, new String[] {}); 155 } 156 157 /** 158 * Handle the --msgsel, --xmsgsel. 159 * 160 * @param token - option token to handle 161 * @param tokens - succeeding command arguments 162 * @throws Exception 163 */ 164 protected void handleOption(String token, List<String> tokens) throws Exception { 165 // If token is an additive message selector option 166 if (token.startsWith("--msgsel")) { 167 168 // If no message selector is specified, or next token is a new 169 // option 170 if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) { 171 context.printException(new IllegalArgumentException("Message selector not specified")); 172 return; 173 } 174 175 StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER); 176 while (queryTokens.hasMoreTokens()) { 177 queryAddObjects.add(queryTokens.nextToken()); 178 } 179 } else if (token.startsWith("--xmsgsel")) { 180 // If token is a substractive message selector option 181 182 // If no message selector is specified, or next token is a new 183 // option 184 if (tokens.isEmpty() || ((String)tokens.get(0)).startsWith("-")) { 185 context.printException(new IllegalArgumentException("Message selector not specified")); 186 return; 187 } 188 189 StringTokenizer queryTokens = new StringTokenizer((String)tokens.remove(0), COMMAND_OPTION_DELIMETER); 190 while (queryTokens.hasMoreTokens()) { 191 querySubObjects.add(queryTokens.nextToken()); 192 } 193 194 } else { 195 // Let super class handle unknown option 196 super.handleOption(token, tokens); 197 } 198 } 199 200 /** 201 * Converts the message selector as provided on command line 202 * argument to activem-admin into an SQL-92 conform string. 203 * E.g. 204 * "JMSMessageID='*:10',JMSPriority>5" 205 * gets converted into 206 * "(JMSMessageID='%:10') AND (JMSPriority>5)" 207 * 208 * @param tokens - List of message selector query parameters 209 * @return SQL-92 string of that query. 210 */ 211 public String convertToSQL92(List<String> tokens) { 212 String selector = ""; 213 214 // Convert to message selector 215 for (Iterator i = tokens.iterator(); i.hasNext(); ) { 216 selector = selector + "(" + i.next().toString() + ") AND "; 217 } 218 219 // Remove last AND and replace '*' with '%' 220 if (!selector.equals("")) { 221 selector = selector.substring(0, selector.length() - 5); 222 selector = selector.replace('*', '%'); 223 } 224 return selector; 225 } 226 227 228 /** 229 * Print the help messages for the browse command 230 */ 231 protected void printHelp() { 232 context.printHelp(helpFile); 233 } 234 235}