001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.console.filter; 018 019import java.net.URI; 020import java.util.Collections; 021import java.util.List; 022 023import javax.jms.Connection; 024import javax.jms.ConnectionFactory; 025import javax.jms.Destination; 026import javax.jms.JMSException; 027import javax.jms.QueueBrowser; 028import javax.jms.Session; 029 030import org.apache.activemq.ActiveMQConnectionFactory; 031import org.apache.activemq.command.ActiveMQQueue; 032import org.apache.activemq.command.ActiveMQTopic; 033 034public class AmqMessagesQueryFilter extends AbstractQueryFilter { 035 036 private URI brokerUrl; 037 private Destination destination; 038 039 private ConnectionFactory connectionFactory; 040 041 /** 042 * Create a JMS message query filter 043 * 044 * @param brokerUrl - broker url to connect to 045 * @param destination - JMS destination to query 046 */ 047 public AmqMessagesQueryFilter(URI brokerUrl, Destination destination) { 048 super(null); 049 this.brokerUrl = brokerUrl; 050 this.destination = destination; 051 } 052 053 /** 054 * Create a JMS message query filter 055 * 056 * @param connectionFactory - to connect with 057 * @param destination - JMS destination to query 058 */ 059 public AmqMessagesQueryFilter(ConnectionFactory connectionFactory, Destination destination) { 060 super(null); 061 this.destination = destination; 062 this.connectionFactory = connectionFactory; 063 } 064 065 /** 066 * Queries the specified destination using the message selector format query 067 * 068 * @param queries - message selector queries 069 * @return list messages that matches the selector 070 * @throws Exception 071 */ 072 public List query(List queries) throws Exception { 073 String selector = ""; 074 075 // Convert to message selector 076 for (Object query : queries) { 077 selector = selector + "(" + query.toString() + ") AND "; 078 } 079 080 // Remove last AND 081 if (!selector.equals("")) { 082 selector = selector.substring(0, selector.length() - 5); 083 } 084 085 if (destination instanceof ActiveMQQueue) { 086 return queryMessages((ActiveMQQueue) destination, selector); 087 } else { 088 return queryMessages((ActiveMQTopic) destination, selector); 089 } 090 } 091 092 /** 093 * Query the messages of a queue destination using a queue browser 094 * 095 * @param queue - queue destination 096 * @param selector - message selector 097 * @return list of messages that matches the selector 098 * @throws Exception 099 */ 100 protected List queryMessages(ActiveMQQueue queue, String selector) throws Exception { 101 Connection conn = createConnection(); 102 103 Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 104 QueueBrowser browser = sess.createBrowser(queue, selector); 105 106 List messages = Collections.list(browser.getEnumeration()); 107 108 conn.close(); 109 110 return messages; 111 } 112 113 /** 114 * Query the messages of a topic destination using a message consumer 115 * 116 * @param topic - topic destination 117 * @param selector - message selector 118 * @return list of messages that matches the selector 119 * @throws Exception 120 */ 121 protected List queryMessages(ActiveMQTopic topic, String selector) throws Exception { 122 // TODO: should we use a durable subscriber or a retroactive non-durable 123 // subscriber? 124 // TODO: if a durable subscriber is used, how do we manage it? 125 // subscribe/unsubscribe tasks? 126 return null; 127 } 128 129 /** 130 * Create and start a JMS connection 131 * 132 * @return JMS connection 133 * @throws JMSException 134 */ 135 protected Connection createConnection() throws JMSException { 136 // maintain old behaviour, when called either way. 137 if (null == connectionFactory) { 138 connectionFactory = (new ActiveMQConnectionFactory(getBrokerUrl())); 139 } 140 Connection conn = connectionFactory.createConnection(); 141 conn.start(); 142 return conn; 143 } 144 145 /** 146 * Get the broker url being used. 147 * 148 * @return broker url 149 */ 150 public URI getBrokerUrl() { 151 return brokerUrl; 152 } 153 154 /** 155 * Set the broker url to use. 156 * 157 * @param brokerUrl - broker url 158 */ 159 public void setBrokerUrl(URI brokerUrl) { 160 this.brokerUrl = brokerUrl; 161 } 162 163 /** 164 * Get the destination being used. 165 * 166 * @return - JMS destination 167 */ 168 public Destination getDestination() { 169 return destination; 170 } 171 172 /** 173 * Set the destination to use. 174 * 175 * @param destination - JMS destination 176 */ 177 public void setDestination(Destination destination) { 178 this.destination = destination; 179 } 180}