001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.web; 019 020import java.io.IOException; 021import java.io.PrintWriter; 022import java.util.Enumeration; 023import java.util.HashMap; 024import java.util.HashSet; 025 026import javax.jms.Destination; 027import javax.jms.JMSException; 028import javax.jms.Message; 029import javax.jms.MessageConsumer; 030import javax.jms.ObjectMessage; 031import javax.jms.TextMessage; 032import javax.servlet.ServletConfig; 033import javax.servlet.ServletException; 034import javax.servlet.http.HttpServletRequest; 035import javax.servlet.http.HttpServletResponse; 036 037import org.apache.activemq.MessageAvailableConsumer; 038import org.apache.activemq.MessageAvailableListener; 039import org.eclipse.jetty.continuation.Continuation; 040import org.eclipse.jetty.continuation.ContinuationSupport; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * A servlet for sending and receiving messages to/from JMS destinations using 046 * HTTP POST for sending and HTTP GET for receiving. 047 * <p/> 048 * You can specify the destination and whether it is a topic or queue via 049 * configuration details on the servlet or as request parameters. 050 * <p/> 051 * For reading messages you can specify a readTimeout parameter to determine how 052 * long the servlet should block for. 053 * 054 * One thing to keep in mind with this solution - due to the nature of REST, 055 * there will always be a chance of losing messages. Consider what happens when 056 * a message is retrieved from the broker but the web call is interrupted before 057 * the client receives the message in the response - the message is lost. 058 */ 059public class MessageServlet extends MessageServletSupport { 060 061 // its a bit pita that this servlet got intermixed with jetty continuation/rest 062 // instead of creating a special for that. We should have kept a simple servlet 063 // for good old fashioned request/response blocked communication. 064 065 private static final long serialVersionUID = 8737914695188481219L; 066 067 private static final Logger LOG = LoggerFactory.getLogger(MessageServlet.class); 068 069 private final String readTimeoutParameter = "readTimeout"; 070 private final String readTimeoutRequestAtt = "xamqReadDeadline"; 071 private final String oneShotParameter = "oneShot"; 072 private long defaultReadTimeout = -1; 073 private long maximumReadTimeout = 20000; 074 private long requestTimeout = 1000; 075 private String defaultContentType = "application/xml"; 076 077 private final HashMap<String, WebClient> clients = new HashMap<String, WebClient>(); 078 private final HashSet<MessageAvailableConsumer> activeConsumers = new HashSet<MessageAvailableConsumer>(); 079 080 @Override 081 public void init() throws ServletException { 082 ServletConfig servletConfig = getServletConfig(); 083 String name = servletConfig.getInitParameter("defaultReadTimeout"); 084 if (name != null) { 085 defaultReadTimeout = asLong(name); 086 } 087 name = servletConfig.getInitParameter("maximumReadTimeout"); 088 if (name != null) { 089 maximumReadTimeout = asLong(name); 090 } 091 name = servletConfig.getInitParameter("replyTimeout"); 092 if (name != null) { 093 requestTimeout = asLong(name); 094 } 095 name = servletConfig.getInitParameter("defaultContentType"); 096 if (name != null) { 097 defaultContentType = name; 098 } 099 } 100 101 /** 102 * Sends a message to a destination 103 * 104 * @param request 105 * @param response 106 * @throws ServletException 107 * @throws IOException 108 */ 109 @Override 110 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 111 // lets turn the HTTP post into a JMS Message 112 try { 113 114 String action = request.getParameter("action"); 115 String clientId = request.getParameter("clientId"); 116 if (action != null && clientId != null && action.equals("unsubscribe")) { 117 LOG.info("Unsubscribing client " + clientId); 118 WebClient client = getWebClient(request); 119 client.close(); 120 clients.remove(clientId); 121 return; 122 } 123 124 WebClient client = getWebClient(request); 125 126 String text = getPostedMessageBody(request); 127 128 // lets create the destination from the URI? 129 Destination destination = getDestination(client, request); 130 if (destination == null) { 131 throw new NoDestinationSuppliedException(); 132 } 133 134 if (LOG.isDebugEnabled()) { 135 LOG.debug("Sending message to: " + destination + " with text: " + text); 136 } 137 138 boolean sync = isSync(request); 139 TextMessage message = client.getSession().createTextMessage(text); 140 141 appendParametersToMessage(request, message); 142 boolean persistent = isSendPersistent(request); 143 int priority = getSendPriority(request); 144 long timeToLive = getSendTimeToLive(request); 145 client.send(destination, message, persistent, priority, timeToLive); 146 147 // lets return a unique URI for reliable messaging 148 response.setHeader("messageID", message.getJMSMessageID()); 149 response.setStatus(HttpServletResponse.SC_OK); 150 response.getWriter().write("Message sent"); 151 152 } catch (JMSException e) { 153 throw new ServletException("Could not post JMS message: " + e, e); 154 } 155 } 156 157 /** 158 * Supports a HTTP DELETE to be equivalent of consuming a singe message 159 * from a queue 160 */ 161 @Override 162 protected void doDelete(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 163 doMessages(request, response); 164 } 165 166 /** 167 * Supports a HTTP DELETE to be equivalent of consuming a singe message 168 * from a queue 169 */ 170 @Override 171 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 172 doMessages(request, response); 173 } 174 175 /** 176 * Reads a message from a destination up to some specific timeout period 177 * 178 * @param request 179 * @param response 180 * @throws ServletException 181 * @throws IOException 182 */ 183 protected void doMessages(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 184 MessageAvailableConsumer consumer = null; 185 186 try { 187 WebClient client = getWebClient(request); 188 Destination destination = getDestination(client, request); 189 if (destination == null) { 190 throw new NoDestinationSuppliedException(); 191 } 192 consumer = (MessageAvailableConsumer) client.getConsumer(destination, request.getHeader(WebClient.selectorName)); 193 Continuation continuation = ContinuationSupport.getContinuation(request); 194 195 // Don't allow concurrent use of the consumer. Do make sure to allow 196 // subsequent calls on continuation to use the consumer. 197 if (continuation.isInitial()) { 198 synchronized (activeConsumers) { 199 if (activeConsumers.contains(consumer)) { 200 throw new ServletException("Concurrent access to consumer is not supported"); 201 } else { 202 activeConsumers.add(consumer); 203 } 204 } 205 } 206 207 Message message = null; 208 209 long deadline = getReadDeadline(request); 210 long timeout = deadline - System.currentTimeMillis(); 211 212 // Set the message available listener *before* calling receive to eliminate any 213 // chance of a missed notification between the time receive() completes without 214 // a message and the time the listener is set. 215 synchronized (consumer) { 216 Listener listener = (Listener) consumer.getAvailableListener(); 217 if (listener == null) { 218 listener = new Listener(consumer); 219 consumer.setAvailableListener(listener); 220 } 221 } 222 223 if (LOG.isDebugEnabled()) { 224 LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout); 225 } 226 227 // Look for any available messages (need a little timeout). Always 228 // try at least one lookup; don't block past the deadline. 229 if (timeout <= 0) { 230 message = consumer.receiveNoWait(); 231 } else if (timeout < 10) { 232 message = consumer.receive(timeout); 233 } else { 234 message = consumer.receive(10); 235 } 236 237 if (message == null) { 238 handleContinuation(request, response, client, destination, consumer, deadline); 239 } else { 240 writeResponse(request, response, message); 241 closeConsumerOnOneShot(request, client, destination); 242 243 synchronized (activeConsumers) { 244 activeConsumers.remove(consumer); 245 } 246 } 247 } catch (JMSException e) { 248 throw new ServletException("Could not post JMS message: " + e, e); 249 } 250 } 251 252 protected void handleContinuation(HttpServletRequest request, HttpServletResponse response, WebClient client, Destination destination, 253 MessageAvailableConsumer consumer, long deadline) { 254 // Get an existing Continuation or create a new one if there are no events. 255 Continuation continuation = ContinuationSupport.getContinuation(request); 256 257 long timeout = deadline - System.currentTimeMillis(); 258 if ((continuation.isExpired()) || (timeout <= 0)) { 259 // Reset the continuation on the available listener for the consumer to prevent the 260 // next message receipt from being consumed without a valid, active continuation. 261 synchronized (consumer) { 262 Object obj = consumer.getAvailableListener(); 263 if (obj instanceof Listener) { 264 ((Listener) obj).setContinuation(null); 265 } 266 } 267 response.setStatus(HttpServletResponse.SC_NO_CONTENT); 268 closeConsumerOnOneShot(request, client, destination); 269 synchronized (activeConsumers) { 270 activeConsumers.remove(consumer); 271 } 272 return; 273 } 274 275 continuation.setTimeout(timeout); 276 continuation.suspend(); 277 278 synchronized (consumer) { 279 Listener listener = (Listener) consumer.getAvailableListener(); 280 281 // register this continuation with our listener. 282 listener.setContinuation(continuation); 283 } 284 } 285 286 protected void writeResponse(HttpServletRequest request, HttpServletResponse response, Message message) throws IOException, JMSException { 287 int messages = 0; 288 try { 289 response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate"); // HTTP 290 // 1.1 291 response.setHeader("Pragma", "no-cache"); // HTTP 1.0 292 response.setDateHeader("Expires", 0); 293 // write a responds 294 PrintWriter writer = response.getWriter(); 295 296 // handle any message(s) 297 if (message == null) { 298 // No messages so OK response of for ajax else no content. 299 response.setStatus(HttpServletResponse.SC_NO_CONTENT); 300 } else { 301 // We have at least one message so set up the response 302 messages = 1; 303 304 String type = getContentType(request); 305 if (type != null) { 306 response.setContentType(type); 307 } else { 308 if (isXmlContent(message)) { 309 response.setContentType(defaultContentType); 310 } else { 311 response.setContentType("text/plain"); 312 } 313 } 314 response.setStatus(HttpServletResponse.SC_OK); 315 316 setResponseHeaders(response, message); 317 writeMessageResponse(writer, message); 318 writer.flush(); 319 } 320 } finally { 321 if (LOG.isDebugEnabled()) { 322 LOG.debug("Received " + messages + " message(s)"); 323 } 324 } 325 } 326 327 protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException { 328 if (message instanceof TextMessage) { 329 TextMessage textMsg = (TextMessage) message; 330 String txt = textMsg.getText(); 331 if (txt != null) { 332 if (txt.startsWith("<?")) { 333 txt = txt.substring(txt.indexOf("?>") + 2); 334 } 335 writer.print(txt); 336 } 337 } else if (message instanceof ObjectMessage) { 338 ObjectMessage objectMsg = (ObjectMessage) message; 339 Object object = objectMsg.getObject(); 340 if (object != null) { 341 writer.print(object.toString()); 342 } 343 } 344 } 345 346 protected boolean isXmlContent(Message message) throws JMSException { 347 if (message instanceof TextMessage) { 348 TextMessage textMsg = (TextMessage) message; 349 String txt = textMsg.getText(); 350 if (txt != null) { 351 // assume its xml when it starts with < 352 if (txt.startsWith("<")) { 353 return true; 354 } 355 } 356 } 357 // for any other kind of messages we dont assume xml 358 return false; 359 } 360 361 public WebClient getWebClient(HttpServletRequest request) { 362 String clientId = request.getParameter("clientId"); 363 if (clientId != null) { 364 synchronized (this) { 365 LOG.debug("Getting local client [" + clientId + "]"); 366 WebClient client = clients.get(clientId); 367 if (client == null) { 368 LOG.debug("Creating new client [" + clientId + "]"); 369 client = new WebClient(); 370 clients.put(clientId, client); 371 } 372 return client; 373 } 374 375 } else { 376 return WebClient.getWebClient(request); 377 } 378 } 379 380 protected String getContentType(HttpServletRequest request) { 381 String value = request.getParameter("xml"); 382 if (value != null && "true".equalsIgnoreCase(value)) { 383 return "application/xml"; 384 } 385 value = request.getParameter("json"); 386 if (value != null && "true".equalsIgnoreCase(value)) { 387 return "application/json"; 388 } 389 return null; 390 } 391 392 @SuppressWarnings("rawtypes") 393 protected void setResponseHeaders(HttpServletResponse response, Message message) throws JMSException { 394 response.setHeader("destination", message.getJMSDestination().toString()); 395 response.setHeader("id", message.getJMSMessageID()); 396 397 // Return JMS properties as header values. 398 for (Enumeration names = message.getPropertyNames(); names.hasMoreElements();) { 399 String name = (String) names.nextElement(); 400 response.setHeader(name, message.getObjectProperty(name).toString()); 401 } 402 } 403 404 /** 405 * @return the timeout value for read requests which is always >= 0 and <= 406 * maximumReadTimeout to avoid DoS attacks 407 */ 408 protected long getReadDeadline(HttpServletRequest request) { 409 Long answer; 410 411 answer = (Long) request.getAttribute(readTimeoutRequestAtt); 412 413 if (answer == null) { 414 long timeout = defaultReadTimeout; 415 String name = request.getParameter(readTimeoutParameter); 416 if (name != null) { 417 timeout = asLong(name); 418 } 419 if (timeout < 0 || timeout > maximumReadTimeout) { 420 timeout = maximumReadTimeout; 421 } 422 423 answer = Long.valueOf(System.currentTimeMillis() + timeout); 424 } 425 return answer.longValue(); 426 } 427 428 /** 429 * Close the consumer if one-shot mode is used on the given request. 430 */ 431 protected void closeConsumerOnOneShot(HttpServletRequest request, WebClient client, Destination dest) { 432 if (asBoolean(request.getParameter(oneShotParameter), false)) { 433 try { 434 client.closeConsumer(dest); 435 } catch (JMSException jms_exc) { 436 LOG.warn("JMS exception on closing consumer after request with one-shot mode", jms_exc); 437 } 438 } 439 } 440 441 /* 442 * Listen for available messages and wakeup any continuations. 443 */ 444 private static class Listener implements MessageAvailableListener { 445 MessageConsumer consumer; 446 Continuation continuation; 447 448 Listener(MessageConsumer consumer) { 449 this.consumer = consumer; 450 } 451 452 public void setContinuation(Continuation continuation) { 453 synchronized (consumer) { 454 this.continuation = continuation; 455 } 456 } 457 458 @Override 459 public void onMessageAvailable(MessageConsumer consumer) { 460 assert this.consumer == consumer; 461 462 ((MessageAvailableConsumer) consumer).setAvailableListener(null); 463 464 synchronized (this.consumer) { 465 if (continuation != null) { 466 continuation.resume(); 467 } 468 } 469 } 470 } 471}