001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.http; 018 019import java.io.BufferedReader; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InputStreamReader; 024import java.util.HashMap; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.LinkedBlockingQueue; 028import java.util.concurrent.TimeUnit; 029import java.util.zip.GZIPInputStream; 030import javax.servlet.ServletException; 031import javax.servlet.http.HttpServlet; 032import javax.servlet.http.HttpServletRequest; 033import javax.servlet.http.HttpServletResponse; 034 035import org.apache.activemq.Service; 036import org.apache.activemq.command.Command; 037import org.apache.activemq.command.ConnectionInfo; 038import org.apache.activemq.command.WireFormatInfo; 039import org.apache.activemq.transport.Transport; 040import org.apache.activemq.transport.TransportAcceptListener; 041import org.apache.activemq.transport.util.TextWireFormat; 042import org.apache.activemq.transport.xstream.XStreamWireFormat; 043import org.apache.activemq.util.IOExceptionSupport; 044import org.apache.activemq.util.ServiceListener; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * A servlet which handles server side HTTP transport, delegating to the 050 * ActiveMQ broker. This servlet is designed for being embedded inside an 051 * ActiveMQ Broker using an embedded Jetty or Tomcat instance. 052 */ 053public class HttpTunnelServlet extends HttpServlet { 054 private static final long serialVersionUID = -3826714430767484333L; 055 private static final Logger LOG = LoggerFactory.getLogger(HttpTunnelServlet.class); 056 057 private TransportAcceptListener listener; 058 private HttpTransportFactory transportFactory; 059 private TextWireFormat wireFormat; 060 private ConcurrentMap<String, BlockingQueueTransport> clients = new ConcurrentHashMap<String, BlockingQueueTransport>(); 061 private final long requestTimeout = 30000L; 062 private HashMap<String, Object> transportOptions; 063 064 @SuppressWarnings("unchecked") 065 @Override 066 public void init() throws ServletException { 067 super.init(); 068 listener = (TransportAcceptListener)getServletContext().getAttribute("acceptListener"); 069 if (listener == null) { 070 throw new ServletException("No such attribute 'acceptListener' available in the ServletContext"); 071 } 072 transportFactory = (HttpTransportFactory)getServletContext().getAttribute("transportFactory"); 073 if (transportFactory == null) { 074 throw new ServletException("No such attribute 'transportFactory' available in the ServletContext"); 075 } 076 transportOptions = (HashMap<String, Object>)getServletContext().getAttribute("transportOptions"); 077 wireFormat = (TextWireFormat)getServletContext().getAttribute("wireFormat"); 078 if (wireFormat == null) { 079 wireFormat = createWireFormat(); 080 } 081 } 082 083 @Override 084 protected void doOptions(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 085 response.addHeader("Accepts-Encoding", "gzip"); 086 super.doOptions(request, response); 087 } 088 089 @Override 090 protected void doHead(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 091 createTransportChannel(request, response); 092 } 093 094 @Override 095 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 096 // lets return the next response 097 Command packet = null; 098 int count = 0; 099 try { 100 BlockingQueueTransport transportChannel = getTransportChannel(request, response); 101 if (transportChannel == null) { 102 return; 103 } 104 105 packet = (Command)transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS); 106 107 DataOutputStream stream = new DataOutputStream(response.getOutputStream()); 108 wireFormat.marshal(packet, stream); 109 count++; 110 } catch (InterruptedException ignore) { 111 } 112 113 if (count == 0) { 114 response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT); 115 } 116 } 117 118 @Override 119 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { 120 121 InputStream stream = request.getInputStream(); 122 String contentType = request.getContentType(); 123 if (contentType != null && contentType.equals("application/x-gzip")) { 124 stream = new GZIPInputStream(stream); 125 } 126 127 // Read the command directly from the reader, assuming UTF8 encoding 128 Command command = (Command) wireFormat.unmarshalText(new InputStreamReader(stream, "UTF-8")); 129 130 if (command instanceof WireFormatInfo) { 131 WireFormatInfo info = (WireFormatInfo) command; 132 if (!canProcessWireFormatVersion(info.getVersion())) { 133 response.sendError(HttpServletResponse.SC_NOT_FOUND, "Cannot process wire format of version: " 134 + info.getVersion()); 135 } 136 137 } else { 138 139 BlockingQueueTransport transport = getTransportChannel(request, response); 140 if (transport == null) { 141 return; 142 } 143 144 if (command instanceof ConnectionInfo) { 145 ((ConnectionInfo) command).setTransportContext(request.getAttribute("javax.servlet.request.X509Certificate")); 146 } 147 transport.doConsume(command); 148 } 149 } 150 151 private boolean canProcessWireFormatVersion(int version) { 152 return true; 153 } 154 155 protected String readRequestBody(HttpServletRequest request) throws IOException { 156 StringBuffer buffer = new StringBuffer(); 157 BufferedReader reader = request.getReader(); 158 while (true) { 159 String line = reader.readLine(); 160 if (line == null) { 161 break; 162 } else { 163 buffer.append(line); 164 buffer.append("\n"); 165 } 166 } 167 return buffer.toString(); 168 } 169 170 protected BlockingQueueTransport getTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException { 171 String clientID = request.getHeader("clientID"); 172 if (clientID == null) { 173 response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified"); 174 LOG.warn("No clientID header specified"); 175 return null; 176 } 177 BlockingQueueTransport answer = clients.get(clientID); 178 if (answer == null) { 179 LOG.warn("The clientID header specified is invalid. Client sesion has not yet been established for it: " + clientID); 180 return null; 181 } 182 return answer; 183 } 184 185 protected BlockingQueueTransport createTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException { 186 final String clientID = request.getHeader("clientID"); 187 188 if (clientID == null) { 189 response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified"); 190 LOG.warn("No clientID header specified"); 191 return null; 192 } 193 194 // Optimistically create the client's transport; this transport may be thrown away if the client has already registered. 195 BlockingQueueTransport answer = createTransportChannel(); 196 197 // Record the client's transport and ensure that it has not already registered; this is thread-safe and only allows one 198 // thread to register the client 199 if (clients.putIfAbsent(clientID, answer) != null) { 200 response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '" + clientID + "' has already been established"); 201 LOG.warn("A session for clientID '" + clientID + "' has already been established"); 202 return null; 203 } 204 205 // Ensure that the client's transport is cleaned up when no longer 206 // needed. 207 answer.addServiceListener(new ServiceListener() { 208 public void started(Service service) { 209 // Nothing to do. 210 } 211 212 public void stopped(Service service) { 213 clients.remove(clientID); 214 } 215 }); 216 217 // Configure the transport with any additional properties or filters. Although the returned transport is not explicitly 218 // persisted, if it is a filter (e.g., InactivityMonitor) it will be linked to the client's transport as a TransportListener 219 // and not GC'd until the client's transport is disposed. 220 Transport transport = answer; 221 try { 222 // Preserve the transportOptions for future use by making a copy before applying (they are removed when applied). 223 HashMap<String, Object> options = new HashMap<String, Object>(transportOptions); 224 transport = transportFactory.serverConfigure(answer, null, options); 225 } catch (Exception e) { 226 throw IOExceptionSupport.create(e); 227 } 228 229 // Wait for the transport to be connected or disposed. 230 listener.onAccept(transport); 231 while (!transport.isConnected() && !transport.isDisposed()) { 232 try { 233 Thread.sleep(100); 234 } catch (InterruptedException ignore) { 235 } 236 } 237 238 // Ensure that the transport was not prematurely disposed. 239 if (transport.isDisposed()) { 240 response.sendError(HttpServletResponse.SC_BAD_REQUEST, "The session for clientID '" + clientID + "' was prematurely disposed"); 241 LOG.warn("The session for clientID '" + clientID + "' was prematurely disposed"); 242 return null; 243 } 244 245 return answer; 246 } 247 248 protected BlockingQueueTransport createTransportChannel() { 249 return new BlockingQueueTransport(new LinkedBlockingQueue<Object>()); 250 } 251 252 protected TextWireFormat createWireFormat() { 253 return new XStreamWireFormat(); 254 } 255}