001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.web; 018 019import java.io.IOException; 020import java.io.PrintWriter; 021import java.io.StringWriter; 022import java.util.HashMap; 023import java.util.Iterator; 024import java.util.LinkedList; 025import java.util.List; 026import java.util.Map; 027import java.util.Timer; 028import java.util.TimerTask; 029 030import javax.jms.Destination; 031import javax.jms.JMSException; 032import javax.jms.Message; 033import javax.jms.MessageConsumer; 034import javax.jms.ObjectMessage; 035import javax.jms.TextMessage; 036import javax.servlet.ServletConfig; 037import javax.servlet.ServletException; 038import javax.servlet.http.HttpServletRequest; 039import javax.servlet.http.HttpServletResponse; 040import javax.servlet.http.HttpSession; 041 042import org.apache.activemq.MessageAvailableConsumer; 043import org.eclipse.jetty.continuation.Continuation; 044import org.eclipse.jetty.continuation.ContinuationListener; 045import org.eclipse.jetty.continuation.ContinuationSupport; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * A servlet for sending and receiving messages to/from JMS destinations using 051 * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the 052 * destination and whether it is a topic or queue via configuration details on 053 * the servlet or as request parameters. <p/> For reading messages you can 054 * specify a readTimeout parameter to determine how long the servlet should 055 * block for. The servlet can be configured with the following init parameters: 056 * <dl> 057 * <dt>defaultReadTimeout</dt> 058 * <dd>The default time in ms to wait for messages. May be overridden by a 059 * request using the 'timeout' parameter</dd> 060 * <dt>maximumReadTimeout</dt> 061 * <dd>The maximum value a request may specify for the 'timeout' parameter</dd> 062 * <dt>maximumMessages</dt> 063 * <dd>maximum messages to send per response</dd> 064 * <dt></dt> 065 * <dd></dd> 066 * </dl> 067 * 068 * 069 */ 070@SuppressWarnings("serial") 071public class MessageListenerServlet extends MessageServletSupport { 072 private static final Logger LOG = LoggerFactory.getLogger(MessageListenerServlet.class); 073 074 private final String readTimeoutParameter = "timeout"; 075 private long defaultReadTimeout = -1; 076 private long maximumReadTimeout = 25000; 077 private int maximumMessages = 100; 078 private final Timer clientCleanupTimer = new Timer("ActiveMQ Ajax Client Cleanup Timer", true); 079 private final HashMap<String,AjaxWebClient> ajaxWebClients = new HashMap<String,AjaxWebClient>(); 080 081 @Override 082 public void init() throws ServletException { 083 ServletConfig servletConfig = getServletConfig(); 084 String name = servletConfig.getInitParameter("defaultReadTimeout"); 085 if (name != null) { 086 defaultReadTimeout = asLong(name); 087 } 088 name = servletConfig.getInitParameter("maximumReadTimeout"); 089 if (name != null) { 090 maximumReadTimeout = asLong(name); 091 } 092 name = servletConfig.getInitParameter("maximumMessages"); 093 if (name != null) { 094 maximumMessages = (int)asLong(name); 095 } 096 clientCleanupTimer.schedule( new ClientCleaner(), 5000, 60000 ); 097 } 098 099 /** 100 * Sends a message to a destination or manage subscriptions. If the the 101 * content type of the POST is 102 * <code>application/x-www-form-urlencoded</code>, then the form 103 * parameters "destination", "message" and "type" are used to pass a message 104 * or a subscription. If multiple messages or subscriptions are passed in a 105 * single post, then additional parameters are shortened to "dN", "mN" and 106 * "tN" where N is an index starting from 1. The type is either "send", 107 * "listen" or "unlisten". For send types, the message is the text of the 108 * TextMessage, otherwise it is the ID to be used for the subscription. If 109 * the content type is not <code>application/x-www-form-urlencoded</code>, 110 * then the body of the post is sent as the message to a destination that is 111 * derived from a query parameter, the URL or the default destination. 112 * 113 * @param request 114 * @param response 115 * @throws ServletException 116 * @throws IOException 117 */ 118 @Override 119 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 120 121 // lets turn the HTTP post into a JMS Message 122 AjaxWebClient client = getAjaxWebClient( request ); 123 String messageIds = ""; 124 125 synchronized (client) { 126 127 if (LOG.isDebugEnabled()) { 128 LOG.debug("POST client=" + client + " session=" + request.getSession().getId() + " clientId="+ request.getParameter("clientId") + " info=" + request.getPathInfo() + " contentType=" + request.getContentType()); 129 // dump(request.getParameterMap()); 130 } 131 132 int messages = 0; 133 134 // loop until no more messages 135 while (true) { 136 // Get the message parameters. Multiple messages are encoded 137 // with more compact parameter names. 138 String destinationName = request.getParameter(messages == 0 ? "destination" : ("d" + messages)); 139 140 if (destinationName == null) { 141 destinationName = request.getHeader("destination"); 142 } 143 144 String message = request.getParameter(messages == 0 ? "message" : ("m" + messages)); 145 String type = request.getParameter(messages == 0 ? "type" : ("t" + messages)); 146 147 if (destinationName == null || message == null || type == null) { 148 break; 149 } 150 151 try { 152 Destination destination = getDestination(client, request, destinationName); 153 154 if (LOG.isDebugEnabled()) { 155 LOG.debug(messages + " destination=" + destinationName + " message=" + message + " type=" + type); 156 LOG.debug(destination + " is a " + destination.getClass().getName()); 157 } 158 159 messages++; 160 161 if ("listen".equals(type)) { 162 AjaxListener listener = client.getListener(); 163 Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap(); 164 Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap(); 165 client.closeConsumer(destination); // drop any existing 166 // consumer. 167 MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName)); 168 169 consumer.setAvailableListener(listener); 170 consumerIdMap.put(consumer, message); 171 consumerDestinationNameMap.put(consumer, destinationName); 172 if (LOG.isDebugEnabled()) { 173 LOG.debug("Subscribed: " + consumer + " to " + destination + " id=" + message); 174 } 175 } else if ("unlisten".equals(type)) { 176 Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap(); 177 Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap(); 178 MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName)); 179 180 consumer.setAvailableListener(null); 181 consumerIdMap.remove(consumer); 182 consumerDestinationNameMap.remove(consumer); 183 client.closeConsumer(destination); 184 if (LOG.isDebugEnabled()) { 185 LOG.debug("Unsubscribed: " + consumer); 186 } 187 } else if ("send".equals(type)) { 188 TextMessage text = client.getSession().createTextMessage(message); 189 appendParametersToMessage(request, text); 190 191 client.send(destination, text); 192 messageIds += text.getJMSMessageID() + "\n"; 193 if (LOG.isDebugEnabled()) { 194 LOG.debug("Sent " + message + " to " + destination); 195 } 196 } else { 197 LOG.warn("unknown type " + type); 198 } 199 200 } catch (JMSException e) { 201 LOG.warn("jms", e); 202 } 203 } 204 } 205 206 if ("true".equals(request.getParameter("poll"))) { 207 try { 208 // TODO return message IDs 209 doMessages(client, request, response); 210 } catch (JMSException e) { 211 throw new ServletException("JMS problem: " + e, e); 212 } 213 } else { 214 // handle simple POST of a message 215 if (request.getContentLength() != 0 && (request.getContentType() == null || !request.getContentType().toLowerCase().startsWith("application/x-www-form-urlencoded"))) { 216 try { 217 Destination destination = getDestination(client, request); 218 String body = getPostedMessageBody(request); 219 TextMessage message = client.getSession().createTextMessage(body); 220 appendParametersToMessage(request, message); 221 222 client.send(destination, message); 223 if (LOG.isDebugEnabled()) { 224 LOG.debug("Sent to destination: " + destination + " body: " + body); 225 } 226 messageIds += message.getJMSMessageID() + "\n"; 227 } catch (JMSException e) { 228 throw new ServletException(e); 229 } 230 } 231 232 response.setContentType("text/plain"); 233 response.setHeader("Cache-Control", "no-cache"); 234 response.getWriter().print(messageIds); 235 } 236 } 237 238 /** 239 * Supports a HTTP DELETE to be equivlanent of consuming a singe message 240 * from a queue 241 */ 242 @Override 243 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 244 try { 245 AjaxWebClient client = getAjaxWebClient(request); 246 if (LOG.isDebugEnabled()) { 247 LOG.debug("GET client=" + client + " session=" + request.getSession().getId() + " clientId="+ request.getParameter("clientId") + " uri=" + request.getRequestURI() + " query=" + request.getQueryString()); 248 } 249 250 doMessages(client, request, response); 251 } catch (JMSException e) { 252 throw new ServletException("JMS problem: " + e, e); 253 } 254 } 255 256 /** 257 * Reads a message from a destination up to some specific timeout period 258 * 259 * @param client The webclient 260 * @param request 261 * @param response 262 * @throws ServletException 263 * @throws IOException 264 */ 265 protected void doMessages(AjaxWebClient client, HttpServletRequest request, HttpServletResponse response) throws JMSException, IOException { 266 267 int messages = 0; 268 // This is a poll for any messages 269 270 long timeout = getReadTimeout(request); 271 if (LOG.isDebugEnabled()) { 272 LOG.debug("doMessage timeout=" + timeout); 273 } 274 275 // this is non-null if we're resuming the continuation. 276 // attributes set in AjaxListener 277 UndeliveredAjaxMessage undelivered_message = null; 278 Message message = null; 279 undelivered_message = (UndeliveredAjaxMessage)request.getAttribute("undelivered_message"); 280 if( undelivered_message != null ) { 281 message = undelivered_message.getMessage(); 282 } 283 284 synchronized (client) { 285 286 List<MessageConsumer> consumers = client.getConsumers(); 287 MessageAvailableConsumer consumer = null; 288 if( undelivered_message != null ) { 289 consumer = (MessageAvailableConsumer)undelivered_message.getConsumer(); 290 } 291 292 if (message == null) { 293 // Look for a message that is ready to go 294 for (int i = 0; message == null && i < consumers.size(); i++) { 295 consumer = (MessageAvailableConsumer)consumers.get(i); 296 if (consumer.getAvailableListener() == null) { 297 continue; 298 } 299 300 // Look for any available messages 301 message = consumer.receive(10); 302 if (LOG.isDebugEnabled()) { 303 LOG.debug("received " + message + " from " + consumer); 304 } 305 } 306 } 307 308 // prepare the response 309 response.setContentType("text/xml"); 310 response.setHeader("Cache-Control", "no-cache"); 311 312 if (message == null && client.getListener().getUndeliveredMessages().size() == 0) { 313 Continuation continuation = ContinuationSupport.getContinuation(request); 314 315 // Add a listener to the continuation to make sure it actually 316 // will expire (seems like a bug in Jetty Servlet 3 continuations, 317 // see https://issues.apache.org/jira/browse/AMQ-3447 318 continuation.addContinuationListener(new ContinuationListener() { 319 @Override 320 public void onTimeout(Continuation cont) { 321 if (LOG.isDebugEnabled()) { 322 LOG.debug("Continuation " + cont.toString() + " expired."); 323 } 324 } 325 326 @Override 327 public void onComplete(Continuation cont) { 328 if (LOG.isDebugEnabled()) { 329 LOG.debug("Continuation " + cont.toString() + " completed."); 330 } 331 } 332 }); 333 334 if (continuation.isExpired()) { 335 response.setStatus(HttpServletResponse.SC_OK); 336 StringWriter swriter = new StringWriter(); 337 PrintWriter writer = new PrintWriter(swriter); 338 writer.println("<ajax-response>"); 339 writer.print("</ajax-response>"); 340 341 writer.flush(); 342 String m = swriter.toString(); 343 response.getWriter().println(m); 344 345 return; 346 } 347 348 continuation.setTimeout(timeout); 349 continuation.suspend(); 350 LOG.debug( "Suspending continuation " + continuation ); 351 352 // Fetch the listeners 353 AjaxListener listener = client.getListener(); 354 listener.access(); 355 356 // register this continuation with our listener. 357 listener.setContinuation(continuation); 358 359 return; 360 } 361 362 StringWriter swriter = new StringWriter(); 363 PrintWriter writer = new PrintWriter(swriter); 364 365 Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap(); 366 Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap(); 367 response.setStatus(HttpServletResponse.SC_OK); 368 writer.println("<ajax-response>"); 369 370 // Send any message we already have 371 if (message != null) { 372 String id = consumerIdMap.get(consumer); 373 String destinationName = consumerDestinationNameMap.get(consumer); 374 LOG.debug( "sending pre-existing message" ); 375 writeMessageResponse(writer, message, id, destinationName); 376 377 messages++; 378 } 379 380 // send messages buffered while continuation was unavailable. 381 LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages(); 382 LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages"); 383 synchronized( undeliveredMessages ) { 384 for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext();) { 385 messages++; 386 UndeliveredAjaxMessage undelivered = it.next(); 387 Message msg = undelivered.getMessage(); 388 consumer = (MessageAvailableConsumer)undelivered.getConsumer(); 389 String id = consumerIdMap.get(consumer); 390 String destinationName = consumerDestinationNameMap.get(consumer); 391 LOG.debug( "sending undelivered/buffered messages" ); 392 LOG.debug( "msg:" +msg+ ", id:" +id+ ", destinationName:" +destinationName); 393 writeMessageResponse(writer, msg, id, destinationName); 394 it.remove(); 395 if (messages >= maximumMessages) { 396 break; 397 } 398 } 399 } 400 401 // Send the rest of the messages 402 for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) { 403 consumer = (MessageAvailableConsumer)consumers.get(i); 404 if (consumer.getAvailableListener() == null) { 405 continue; 406 } 407 408 // Look for any available messages 409 while (messages < maximumMessages) { 410 message = consumer.receiveNoWait(); 411 if (message == null) { 412 break; 413 } 414 messages++; 415 String id = consumerIdMap.get(consumer); 416 String destinationName = consumerDestinationNameMap.get(consumer); 417 LOG.debug( "sending final available messages" ); 418 writeMessageResponse(writer, message, id, destinationName); 419 } 420 } 421 422 writer.print("</ajax-response>"); 423 424 writer.flush(); 425 String m = swriter.toString(); 426 response.getWriter().println(m); 427 } 428 } 429 430 protected void writeMessageResponse(PrintWriter writer, Message message, String id, String destinationName) throws JMSException, IOException { 431 writer.print("<response id='"); 432 writer.print(id); 433 writer.print("'"); 434 if (destinationName != null) { 435 writer.print(" destination='" + destinationName + "' "); 436 } 437 writer.print(">"); 438 if (message instanceof TextMessage) { 439 TextMessage textMsg = (TextMessage)message; 440 String txt = textMsg.getText(); 441 if (txt != null) { 442 if (txt.startsWith("<?")) { 443 txt = txt.substring(txt.indexOf("?>") + 2); 444 } 445 writer.print(txt); 446 } 447 } else if (message instanceof ObjectMessage) { 448 ObjectMessage objectMsg = (ObjectMessage)message; 449 Object object = objectMsg.getObject(); 450 if (object != null) { 451 writer.print(object.toString()); 452 } 453 } 454 writer.println("</response>"); 455 } 456 457 /* 458 * Return the AjaxWebClient for this session+clientId. 459 * Create one if it does not already exist. 460 */ 461 protected AjaxWebClient getAjaxWebClient( HttpServletRequest request ) { 462 HttpSession session = request.getSession(true); 463 464 String clientId = request.getParameter( "clientId" ); 465 // if user doesn't supply a 'clientId', we'll just use a default. 466 if( clientId == null ) { 467 clientId = "defaultAjaxWebClient"; 468 } 469 String sessionKey = session.getId() + '-' + clientId; 470 471 AjaxWebClient client = null; 472 synchronized (ajaxWebClients) { 473 client = ajaxWebClients.get( sessionKey ); 474 // create a new AjaxWebClient if one does not already exist for this sessionKey. 475 if( client == null ) { 476 if (LOG.isDebugEnabled()) { 477 LOG.debug( "creating new AjaxWebClient in "+sessionKey ); 478 } 479 client = new AjaxWebClient( request, maximumReadTimeout ); 480 ajaxWebClients.put( sessionKey, client ); 481 } 482 client.updateLastAccessed(); 483 } 484 return client; 485 } 486 487 /** 488 * @return the timeout value for read requests which is always >= 0 and <= 489 * maximumReadTimeout to avoid DoS attacks 490 */ 491 protected long getReadTimeout(HttpServletRequest request) { 492 long answer = defaultReadTimeout; 493 494 String name = request.getParameter(readTimeoutParameter); 495 if (name != null) { 496 answer = asLong(name); 497 } 498 if (answer < 0 || answer > maximumReadTimeout) { 499 answer = maximumReadTimeout; 500 } 501 return answer; 502 } 503 504 /* 505 * an instance of this class runs every minute (started in init), to clean up old web clients & free resources. 506 */ 507 private class ClientCleaner extends TimerTask { 508 @Override 509 public void run() { 510 if( LOG.isDebugEnabled() ) { 511 LOG.debug( "Cleaning up expired web clients." ); 512 } 513 514 synchronized( ajaxWebClients ) { 515 Iterator<Map.Entry<String, AjaxWebClient>> it = ajaxWebClients.entrySet().iterator(); 516 while ( it.hasNext() ) { 517 Map.Entry<String,AjaxWebClient> e = it.next(); 518 String key = e.getKey(); 519 AjaxWebClient val = e.getValue(); 520 if ( LOG.isDebugEnabled() ) { 521 LOG.debug( "AjaxWebClient " + key + " last accessed " + val.getMillisSinceLastAccessed()/1000 + " seconds ago." ); 522 } 523 // close an expired client and remove it from the ajaxWebClients hash. 524 if( val.closeIfExpired() ) { 525 if ( LOG.isDebugEnabled() ) { 526 LOG.debug( "Removing expired AjaxWebClient " + key ); 527 } 528 it.remove(); 529 } 530 } 531 } 532 } 533 } 534 535 @Override 536 public void destroy() { 537 // make sure we cancel the timer 538 clientCleanupTimer.cancel(); 539 super.destroy(); 540 } 541}