001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.web; 019 020import java.io.Externalizable; 021import java.io.IOException; 022import java.io.ObjectInput; 023import java.io.ObjectOutput; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.Semaphore; 030 031import javax.jms.Connection; 032import javax.jms.ConnectionFactory; 033import javax.jms.DeliveryMode; 034import javax.jms.Destination; 035import javax.jms.JMSException; 036import javax.jms.Message; 037import javax.jms.MessageConsumer; 038import javax.jms.MessageProducer; 039import javax.jms.Session; 040import javax.servlet.ServletContext; 041import javax.servlet.http.HttpServletRequest; 042import javax.servlet.http.HttpSession; 043import javax.servlet.http.HttpSessionActivationListener; 044import javax.servlet.http.HttpSessionBindingEvent; 045import javax.servlet.http.HttpSessionBindingListener; 046import javax.servlet.http.HttpSessionEvent; 047 048import org.apache.activemq.ActiveMQConnectionFactory; 049import org.apache.activemq.MessageAvailableConsumer; 050import org.apache.activemq.broker.BrokerRegistry; 051import org.apache.activemq.broker.BrokerService; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Represents a messaging client used from inside a web container typically 057 * stored inside a HttpSession TODO controls to prevent DOS attacks with users 058 * requesting many consumers TODO configure consumers with small prefetch. 059 * 060 * 061 * 062 */ 063public class WebClient implements HttpSessionActivationListener, HttpSessionBindingListener, Externalizable { 064 065 public static final String WEB_CLIENT_ATTRIBUTE = "org.apache.activemq.webclient"; 066 public static final String CONNECTION_FACTORY_ATTRIBUTE = "org.apache.activemq.connectionFactory"; 067 public static final String CONNECTION_FACTORY_PREFETCH_PARAM = "org.apache.activemq.connectionFactory.prefetch"; 068 public static final String CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM = "org.apache.activemq.connectionFactory.optimizeAck"; 069 public static final String BROKER_URL_INIT_PARAM = "org.apache.activemq.brokerURL"; 070 public static final String SELECTOR_NAME = "org.apache.activemq.selectorName"; 071 072 private static final Logger LOG = LoggerFactory.getLogger(WebClient.class); 073 074 private static transient ConnectionFactory factory; 075 076 private transient Map<Destination, MessageConsumer> consumers = new HashMap<Destination, MessageConsumer>(); 077 private transient Connection connection; 078 private transient Session session; 079 private transient MessageProducer producer; 080 private int deliveryMode = DeliveryMode.NON_PERSISTENT; 081 public static String selectorName; 082 083 private final Semaphore semaphore = new Semaphore(1); 084 085 private String username; 086 private String password; 087 088 public WebClient() { 089 if (factory == null) { 090 throw new IllegalStateException("initContext(ServletContext) not called"); 091 } 092 } 093 094 /** 095 * Helper method to get the client for the current session, lazily creating 096 * a client if there is none currently 097 * 098 * @param request is the current HTTP request 099 * @return the current client or a newly creates 100 */ 101 public static WebClient getWebClient(HttpServletRequest request) { 102 HttpSession session = request.getSession(true); 103 WebClient client = getWebClient(session); 104 if (client == null || client.isClosed()) { 105 client = WebClient.createWebClient(request); 106 session.setAttribute(WEB_CLIENT_ATTRIBUTE, client); 107 } 108 109 return client; 110 } 111 112 /** 113 * @return the web client for the current HTTP session or null if there is 114 * not a web client created yet 115 */ 116 public static WebClient getWebClient(HttpSession session) { 117 return (WebClient)session.getAttribute(WEB_CLIENT_ATTRIBUTE); 118 } 119 120 public static void initContext(ServletContext context) { 121 initConnectionFactory(context); 122 context.setAttribute("webClients", new HashMap<String, WebClient>()); 123 if (selectorName == null) { 124 selectorName = context.getInitParameter(SELECTOR_NAME); 125 } 126 if (selectorName == null) { 127 selectorName = "selector"; 128 } 129 } 130 131 public int getDeliveryMode() { 132 return deliveryMode; 133 } 134 135 public void setDeliveryMode(int deliveryMode) { 136 this.deliveryMode = deliveryMode; 137 } 138 139 public String getUsername() { 140 return username; 141 } 142 143 public void setUsername(String username) { 144 this.username = username; 145 } 146 147 public String getPassword() { 148 return password; 149 } 150 151 public void setPassword(String password) { 152 this.password = password; 153 } 154 155 public synchronized void closeConsumers() { 156 for (Iterator<MessageConsumer> it = consumers.values().iterator(); it.hasNext();) { 157 MessageConsumer consumer = it.next(); 158 it.remove(); 159 try { 160 consumer.setMessageListener(null); 161 if (consumer instanceof MessageAvailableConsumer) { 162 ((MessageAvailableConsumer)consumer).setAvailableListener(null); 163 } 164 consumer.close(); 165 } catch (JMSException e) { 166 LOG.debug("caught exception closing consumer", e); 167 } 168 } 169 } 170 171 public synchronized void close() { 172 try { 173 if (consumers != null) { 174 closeConsumers(); 175 } 176 if (connection != null) { 177 connection.close(); 178 } 179 } catch (Exception e) { 180 LOG.debug("caught exception closing consumer", e); 181 } finally { 182 producer = null; 183 session = null; 184 connection = null; 185 if (consumers != null) { 186 consumers.clear(); 187 } 188 consumers = null; 189 190 } 191 } 192 193 public boolean isClosed() { 194 return consumers == null; 195 } 196 197 public void writeExternal(ObjectOutput out) throws IOException { 198 if (consumers != null) { 199 out.write(consumers.size()); 200 Iterator<Destination> i = consumers.keySet().iterator(); 201 while (i.hasNext()) { 202 out.writeObject(i.next().toString()); 203 } 204 } else { 205 out.write(-1); 206 } 207 208 } 209 210 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 211 int size = in.readInt(); 212 if (size >= 0) { 213 consumers = new HashMap<Destination, MessageConsumer>(); 214 for (int i = 0; i < size; i++) { 215 String destinationName = in.readObject().toString(); 216 217 try { 218 Destination destination = destinationName.startsWith("topic://") ? (Destination)getSession().createTopic(destinationName) : (Destination)getSession().createQueue(destinationName); 219 consumers.put(destination, getConsumer(destination, null, true)); 220 } catch (JMSException e) { 221 LOG.debug("Caought Exception ", e); 222 IOException ex = new IOException(e.getMessage()); 223 ex.initCause(e.getCause() != null ? e.getCause() : e); 224 throw ex; 225 226 } 227 } 228 } 229 } 230 231 public void send(Destination destination, Message message) throws JMSException { 232 getProducer().send(destination, message); 233 if (LOG.isDebugEnabled()) { 234 LOG.debug("Sent! to destination: " + destination + " message: " + message); 235 } 236 } 237 238 public void send(Destination destination, Message message, boolean persistent, int priority, long timeToLive) throws JMSException { 239 int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; 240 getProducer().send(destination, message, deliveryMode, priority, timeToLive); 241 if (LOG.isDebugEnabled()) { 242 LOG.debug("Sent! to destination: " + destination + " message: " + message); 243 } 244 } 245 246 public Session getSession() throws JMSException { 247 if (session == null) { 248 session = createSession(); 249 } 250 return session; 251 } 252 253 public Connection getConnection() throws JMSException { 254 if (connection == null) { 255 if (username != null && password != null) { 256 connection = factory.createConnection(username, password); 257 } else { 258 connection = factory.createConnection(); 259 } 260 connection.start(); 261 } 262 return connection; 263 } 264 265 protected static synchronized void initConnectionFactory(ServletContext servletContext) { 266 if (factory == null) { 267 factory = (ConnectionFactory)servletContext.getAttribute(CONNECTION_FACTORY_ATTRIBUTE); 268 } 269 if (factory == null) { 270 String brokerURL = servletContext.getInitParameter(BROKER_URL_INIT_PARAM); 271 272 273 if (brokerURL == null) { 274 LOG.debug("Couldn't find " + BROKER_URL_INIT_PARAM + " param, trying to find a broker embedded in a local VM"); 275 BrokerService broker = BrokerRegistry.getInstance().findFirst(); 276 if (broker == null) { 277 throw new IllegalStateException("missing brokerURL (specified via " + BROKER_URL_INIT_PARAM + " init-Param) or embedded broker"); 278 } else { 279 brokerURL = "vm://" + broker.getBrokerName(); 280 } 281 } 282 283 LOG.debug("Using broker URL: " + brokerURL); 284 285 ActiveMQConnectionFactory amqfactory = new ActiveMQConnectionFactory(brokerURL); 286 287 // Set prefetch policy for factory 288 if (servletContext.getInitParameter(CONNECTION_FACTORY_PREFETCH_PARAM) != null) { 289 int prefetch = Integer.valueOf(servletContext.getInitParameter(CONNECTION_FACTORY_PREFETCH_PARAM)).intValue(); 290 amqfactory.getPrefetchPolicy().setAll(prefetch); 291 } 292 293 // Set optimize acknowledge setting 294 if (servletContext.getInitParameter(CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM) != null) { 295 boolean optimizeAck = Boolean.valueOf(servletContext.getInitParameter(CONNECTION_FACTORY_OPTIMIZE_ACK_PARAM)).booleanValue(); 296 amqfactory.setOptimizeAcknowledge(optimizeAck); 297 } 298 299 factory = amqfactory; 300 301 servletContext.setAttribute(CONNECTION_FACTORY_ATTRIBUTE, factory); 302 } 303 } 304 305 public synchronized MessageProducer getProducer() throws JMSException { 306 if (producer == null) { 307 producer = getSession().createProducer(null); 308 producer.setDeliveryMode(deliveryMode); 309 } 310 return producer; 311 } 312 313 public void setProducer(MessageProducer producer) { 314 this.producer = producer; 315 } 316 317 public synchronized MessageConsumer getConsumer(Destination destination, String selector) throws JMSException { 318 return getConsumer(destination, selector, true); 319 } 320 321 public synchronized MessageConsumer getConsumer(Destination destination, String selector, boolean create) throws JMSException { 322 MessageConsumer consumer = consumers.get(destination); 323 if (create && consumer == null) { 324 consumer = getSession().createConsumer(destination, selector); 325 consumers.put(destination, consumer); 326 } 327 return consumer; 328 } 329 330 public synchronized void closeConsumer(Destination destination) throws JMSException { 331 MessageConsumer consumer = consumers.get(destination); 332 if (consumer != null) { 333 consumers.remove(destination); 334 consumer.setMessageListener(null); 335 if (consumer instanceof MessageAvailableConsumer) { 336 ((MessageAvailableConsumer)consumer).setAvailableListener(null); 337 } 338 consumer.close(); 339 } 340 } 341 342 public synchronized List<MessageConsumer> getConsumers() { 343 return new ArrayList<MessageConsumer>(consumers.values()); 344 } 345 346 protected Session createSession() throws JMSException { 347 return getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); 348 } 349 350 public Semaphore getSemaphore() { 351 return semaphore; 352 } 353 354 public void sessionWillPassivate(HttpSessionEvent event) { 355 close(); 356 } 357 358 public void sessionDidActivate(HttpSessionEvent event) { 359 } 360 361 public void valueBound(HttpSessionBindingEvent event) { 362 } 363 364 public void valueUnbound(HttpSessionBindingEvent event) { 365 close(); 366 } 367 368 protected static WebClient createWebClient(HttpServletRequest request) { 369 WebClient client = new WebClient(); 370 String auth = request.getHeader("Authorization"); 371 if (auth != null) { 372 String[] tokens = auth.split(" "); 373 if (tokens.length == 2) { 374 String encoded = tokens[1].trim(); 375 String credentials = new String(javax.xml.bind.DatatypeConverter.parseBase64Binary(encoded)); 376 String[] creds = credentials.split(":"); 377 if (creds.length == 2) { 378 client.setUsername(creds[0]); 379 client.setPassword(creds[1]); 380 } 381 } 382 } 383 return client; 384 } 385 386}