001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.http; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.net.URI; 023import java.util.zip.GZIPInputStream; 024import java.util.zip.GZIPOutputStream; 025 026import org.apache.activemq.command.ShutdownInfo; 027import org.apache.activemq.transport.FutureResponse; 028import org.apache.activemq.transport.util.TextWireFormat; 029import org.apache.activemq.util.ByteArrayOutputStream; 030import org.apache.activemq.util.IOExceptionSupport; 031import org.apache.activemq.util.IdGenerator; 032import org.apache.activemq.util.ServiceStopper; 033import org.apache.http.Header; 034import org.apache.http.HttpHost; 035import org.apache.http.HttpRequest; 036import org.apache.http.HttpRequestInterceptor; 037import org.apache.http.HttpResponse; 038import org.apache.http.HttpStatus; 039import org.apache.http.auth.AuthScope; 040import org.apache.http.auth.UsernamePasswordCredentials; 041import org.apache.http.client.HttpClient; 042import org.apache.http.client.HttpResponseException; 043import org.apache.http.client.ResponseHandler; 044import org.apache.http.client.methods.HttpGet; 045import org.apache.http.client.methods.HttpHead; 046import org.apache.http.client.methods.HttpOptions; 047import org.apache.http.client.methods.HttpPost; 048import org.apache.http.client.params.CookiePolicy; 049import org.apache.http.client.params.HttpClientParams; 050import org.apache.http.conn.ClientConnectionManager; 051import org.apache.http.conn.params.ConnRoutePNames; 052import org.apache.http.entity.ByteArrayEntity; 053import org.apache.http.impl.client.BasicResponseHandler; 054import org.apache.http.impl.client.DefaultHttpClient; 055import org.apache.http.impl.conn.PoolingClientConnectionManager; 056import org.apache.http.message.AbstractHttpMessage; 057import org.apache.http.params.HttpConnectionParams; 058import org.apache.http.params.HttpParams; 059import org.apache.http.protocol.HttpContext; 060import org.apache.http.util.EntityUtils; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064/** 065 * A HTTP {@link org.apache.activemq.transport.Transport} which uses the 066 * <a href="http://hc.apache.org/index.html">Apache HTTP Client</a> 067 * library 068 */ 069public class HttpClientTransport extends HttpTransportSupport { 070 071 public static final int MAX_CLIENT_TIMEOUT = 30000; 072 private static final Logger LOG = LoggerFactory.getLogger(HttpClientTransport.class); 073 private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator(); 074 075 private HttpClient sendHttpClient; 076 private HttpClient receiveHttpClient; 077 078 private final String clientID = CLIENT_ID_GENERATOR.generateId(); 079 private boolean trace; 080 private HttpGet httpMethod; 081 private volatile int receiveCounter; 082 083 private int soTimeout = MAX_CLIENT_TIMEOUT; 084 085 private boolean useCompression = false; 086 protected boolean canSendCompressed = false; 087 private int minSendAsCompressedSize = 0; 088 089 public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) { 090 super(wireFormat, remoteUrl); 091 } 092 093 public FutureResponse asyncRequest(Object command) throws IOException { 094 return null; 095 } 096 097 @Override 098 public void oneway(Object command) throws IOException { 099 100 if (isStopped()) { 101 throw new IOException("stopped."); 102 } 103 HttpPost httpMethod = new HttpPost(getRemoteUrl().toString()); 104 configureMethod(httpMethod); 105 String data = getTextWireFormat().marshalText(command); 106 byte[] bytes = data.getBytes("UTF-8"); 107 if (useCompression && canSendCompressed && bytes.length > minSendAsCompressedSize) { 108 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 109 GZIPOutputStream stream = new GZIPOutputStream(bytesOut); 110 stream.write(bytes); 111 stream.close(); 112 httpMethod.addHeader("Content-Type", "application/x-gzip"); 113 if (LOG.isTraceEnabled()) { 114 LOG.trace("Sending compressed, size = " + bytes.length + ", compressed size = " + bytesOut.size()); 115 } 116 bytes = bytesOut.toByteArray(); 117 } 118 ByteArrayEntity entity = new ByteArrayEntity(bytes); 119 httpMethod.setEntity(entity); 120 121 HttpClient client = null; 122 HttpResponse answer = null; 123 try { 124 client = getSendHttpClient(); 125 answer = client.execute(httpMethod); 126 int status = answer.getStatusLine().getStatusCode(); 127 if (status != HttpStatus.SC_OK) { 128 throw new IOException("Failed to post command: " + command + " as response was: " + answer); 129 } 130 if (command instanceof ShutdownInfo) { 131 try { 132 stop(); 133 } catch (Exception e) { 134 LOG.warn("Error trying to stop HTTP client: "+ e, e); 135 } 136 } 137 } catch (IOException e) { 138 throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e); 139 } finally { 140 if (answer != null) { 141 EntityUtils.consume(answer.getEntity()); 142 } 143 } 144 } 145 146 @Override 147 public Object request(Object command) throws IOException { 148 return null; 149 } 150 151 private DataInputStream createDataInputStream(HttpResponse answer) throws IOException { 152 Header encoding = answer.getEntity().getContentEncoding(); 153 if (encoding != null && "gzip".equalsIgnoreCase(encoding.getValue())) { 154 return new DataInputStream(new GZIPInputStream(answer.getEntity().getContent())); 155 } else { 156 return new DataInputStream(answer.getEntity().getContent()); 157 } 158 } 159 160 @Override 161 public void run() { 162 163 if (LOG.isTraceEnabled()) { 164 LOG.trace("HTTP GET consumer thread starting: " + this); 165 } 166 HttpClient httpClient = getReceiveHttpClient(); 167 URI remoteUrl = getRemoteUrl(); 168 169 while (!isStopped() && !isStopping()) { 170 171 httpMethod = new HttpGet(remoteUrl.toString()); 172 configureMethod(httpMethod); 173 HttpResponse answer = null; 174 175 try { 176 answer = httpClient.execute(httpMethod); 177 int status = answer.getStatusLine().getStatusCode(); 178 if (status != HttpStatus.SC_OK) { 179 if (status == HttpStatus.SC_REQUEST_TIMEOUT) { 180 LOG.debug("GET timed out"); 181 try { 182 Thread.sleep(1000); 183 } catch (InterruptedException e) { 184 onException(new InterruptedIOException()); 185 Thread.currentThread().interrupt(); 186 break; 187 } 188 } else { 189 onException(new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer)); 190 break; 191 } 192 } else { 193 receiveCounter++; 194 DataInputStream stream = createDataInputStream(answer); 195 Object command = getTextWireFormat().unmarshal(stream); 196 if (command == null) { 197 LOG.debug("Received null command from url: " + remoteUrl); 198 } else { 199 doConsume(command); 200 } 201 stream.close(); 202 } 203 } catch (IOException e) { 204 onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e)); 205 break; 206 } finally { 207 if (answer != null) { 208 try { 209 EntityUtils.consume(answer.getEntity()); 210 } catch (IOException e) { 211 } 212 } 213 } 214 } 215 } 216 217 // Properties 218 // ------------------------------------------------------------------------- 219 public HttpClient getSendHttpClient() { 220 if (sendHttpClient == null) { 221 sendHttpClient = createHttpClient(); 222 } 223 return sendHttpClient; 224 } 225 226 public void setSendHttpClient(HttpClient sendHttpClient) { 227 this.sendHttpClient = sendHttpClient; 228 } 229 230 public HttpClient getReceiveHttpClient() { 231 if (receiveHttpClient == null) { 232 receiveHttpClient = createHttpClient(); 233 } 234 return receiveHttpClient; 235 } 236 237 public void setReceiveHttpClient(HttpClient receiveHttpClient) { 238 this.receiveHttpClient = receiveHttpClient; 239 } 240 241 // Implementation methods 242 // ------------------------------------------------------------------------- 243 @Override 244 protected void doStart() throws Exception { 245 246 if (LOG.isTraceEnabled()) { 247 LOG.trace("HTTP GET consumer thread starting: " + this); 248 } 249 HttpClient httpClient = getReceiveHttpClient(); 250 URI remoteUrl = getRemoteUrl(); 251 252 HttpHead httpMethod = new HttpHead(remoteUrl.toString()); 253 configureMethod(httpMethod); 254 255 // Request the options from the server so we can find out if the broker we are 256 // talking to supports GZip compressed content. If so and useCompression is on 257 // then we can compress our POST data, otherwise we must send it uncompressed to 258 // ensure backwards compatibility. 259 HttpOptions optionsMethod = new HttpOptions(remoteUrl.toString()); 260 ResponseHandler<String> handler = new BasicResponseHandler() { 261 @Override 262 public String handleResponse(HttpResponse response) throws HttpResponseException, IOException { 263 264 for(Header header : response.getAllHeaders()) { 265 if (header.getName().equals("Accepts-Encoding") && header.getValue().contains("gzip")) { 266 LOG.info("Broker Servlet supports GZip compression."); 267 canSendCompressed = true; 268 break; 269 } 270 } 271 272 return super.handleResponse(response); 273 } 274 }; 275 276 try { 277 httpClient.execute(httpMethod, new BasicResponseHandler()); 278 httpClient.execute(optionsMethod, handler); 279 } catch(Exception e) { 280 throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + e.getMessage()); 281 } 282 283 super.doStart(); 284 } 285 286 @Override 287 protected void doStop(ServiceStopper stopper) throws Exception { 288 if (httpMethod != null) { 289 // In some versions of the JVM a race between the httpMethod and the completion 290 // of the method when using HTTPS can lead to a deadlock. This hack attempts to 291 // detect that and interrupt the thread that's locked so that they can complete 292 // on another attempt. 293 for (int i = 0; i < 3; ++i) { 294 Thread abortThread = new Thread(new Runnable() { 295 296 @Override 297 public void run() { 298 try { 299 httpMethod.abort(); 300 } catch (Exception e) { 301 } 302 } 303 }); 304 305 abortThread.start(); 306 abortThread.join(2000); 307 if (abortThread.isAlive() && !httpMethod.isAborted()) { 308 abortThread.interrupt(); 309 } 310 } 311 } 312 } 313 314 protected HttpClient createHttpClient() { 315 DefaultHttpClient client = new DefaultHttpClient(createClientConnectionManager()); 316 if (useCompression) { 317 client.addRequestInterceptor( new HttpRequestInterceptor() { 318 @Override 319 public void process(HttpRequest request, HttpContext context) { 320 // We expect to received a compression response that we un-gzip 321 request.addHeader("Accept-Encoding", "gzip"); 322 } 323 }); 324 } 325 if (getProxyHost() != null) { 326 HttpHost proxy = new HttpHost(getProxyHost(), getProxyPort()); 327 client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy); 328 329 if(getProxyUser() != null && getProxyPassword() != null) { 330 client.getCredentialsProvider().setCredentials( 331 new AuthScope(getProxyHost(), getProxyPort()), 332 new UsernamePasswordCredentials(getProxyUser(), getProxyPassword())); 333 } 334 } 335 336 HttpParams params = client.getParams(); 337 HttpConnectionParams.setSoTimeout(params, soTimeout); 338 HttpClientParams.setCookiePolicy(params, CookiePolicy.BROWSER_COMPATIBILITY); 339 340 return client; 341 } 342 343 protected ClientConnectionManager createClientConnectionManager() { 344 return new PoolingClientConnectionManager(); 345 } 346 347 protected void configureMethod(AbstractHttpMessage method) { 348 method.setHeader("clientID", clientID); 349 } 350 351 public boolean isTrace() { 352 return trace; 353 } 354 355 public void setTrace(boolean trace) { 356 this.trace = trace; 357 } 358 359 @Override 360 public int getReceiveCounter() { 361 return receiveCounter; 362 } 363 364 public int getSoTimeout() { 365 return soTimeout; 366 } 367 368 public void setSoTimeout(int soTimeout) { 369 this.soTimeout = soTimeout; 370 } 371 372 public void setUseCompression(boolean useCompression) { 373 this.useCompression = useCompression; 374 } 375 376 public boolean isUseCompression() { 377 return this.useCompression; 378 } 379 380 public int getMinSendAsCompressedSize() { 381 return minSendAsCompressedSize; 382 } 383 384 /** 385 * Sets the minimum size that must be exceeded on a send before compression is used if 386 * the useCompression option is specified. For very small payloads compression can be 387 * inefficient compared to the transmission size savings. 388 * 389 * Default value is 0. 390 * 391 * @param minSendAsCompressedSize 392 */ 393 public void setMinSendAsCompressedSize(int minSendAsCompressedSize) { 394 this.minSendAsCompressedSize = minSendAsCompressedSize; 395 } 396 397}