001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.http; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.net.URI; 023import java.util.zip.GZIPInputStream; 024import java.util.zip.GZIPOutputStream; 025 026import org.apache.activemq.command.ShutdownInfo; 027import org.apache.activemq.transport.FutureResponse; 028import org.apache.activemq.transport.util.TextWireFormat; 029import org.apache.activemq.util.ByteArrayOutputStream; 030import org.apache.activemq.util.IOExceptionSupport; 031import org.apache.activemq.util.IdGenerator; 032import org.apache.activemq.util.ServiceStopper; 033import org.apache.http.Header; 034import org.apache.http.HttpHost; 035import org.apache.http.HttpRequest; 036import org.apache.http.HttpRequestInterceptor; 037import org.apache.http.HttpResponse; 038import org.apache.http.HttpStatus; 039import org.apache.http.auth.AuthScope; 040import org.apache.http.auth.UsernamePasswordCredentials; 041import org.apache.http.client.HttpClient; 042import org.apache.http.client.HttpResponseException; 043import org.apache.http.client.ResponseHandler; 044import org.apache.http.client.methods.HttpGet; 045import org.apache.http.client.methods.HttpHead; 046import org.apache.http.client.methods.HttpOptions; 047import org.apache.http.client.methods.HttpPost; 048import org.apache.http.conn.ClientConnectionManager; 049import org.apache.http.conn.params.ConnRoutePNames; 050import org.apache.http.entity.ByteArrayEntity; 051import org.apache.http.impl.client.BasicResponseHandler; 052import org.apache.http.impl.client.DefaultHttpClient; 053import org.apache.http.impl.conn.PoolingClientConnectionManager; 054import org.apache.http.message.AbstractHttpMessage; 055import org.apache.http.params.HttpConnectionParams; 056import org.apache.http.params.HttpParams; 057import org.apache.http.protocol.HttpContext; 058import org.apache.http.util.EntityUtils; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * A HTTP {@link org.apache.activemq.transport.Transport} which uses the 064 * <a href="http://hc.apache.org/index.html">Apache HTTP Client</a> 065 * library 066 */ 067public class HttpClientTransport extends HttpTransportSupport { 068 069 public static final int MAX_CLIENT_TIMEOUT = 30000; 070 private static final Logger LOG = LoggerFactory.getLogger(HttpClientTransport.class); 071 private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator(); 072 073 private HttpClient sendHttpClient; 074 private HttpClient receiveHttpClient; 075 076 private final String clientID = CLIENT_ID_GENERATOR.generateId(); 077 private boolean trace; 078 private HttpGet httpMethod; 079 private volatile int receiveCounter; 080 081 private int soTimeout = MAX_CLIENT_TIMEOUT; 082 083 private boolean useCompression = false; 084 protected boolean canSendCompressed = false; 085 private int minSendAsCompressedSize = 0; 086 087 public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) { 088 super(wireFormat, remoteUrl); 089 } 090 091 public FutureResponse asyncRequest(Object command) throws IOException { 092 return null; 093 } 094 095 @Override 096 public void oneway(Object command) throws IOException { 097 098 if (isStopped()) { 099 throw new IOException("stopped."); 100 } 101 HttpPost httpMethod = new HttpPost(getRemoteUrl().toString()); 102 configureMethod(httpMethod); 103 String data = getTextWireFormat().marshalText(command); 104 byte[] bytes = data.getBytes("UTF-8"); 105 if (useCompression && canSendCompressed && bytes.length > minSendAsCompressedSize) { 106 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 107 GZIPOutputStream stream = new GZIPOutputStream(bytesOut); 108 stream.write(bytes); 109 stream.close(); 110 httpMethod.addHeader("Content-Type", "application/x-gzip"); 111 if (LOG.isTraceEnabled()) { 112 LOG.trace("Sending compressed, size = " + bytes.length + ", compressed size = " + bytesOut.size()); 113 } 114 bytes = bytesOut.toByteArray(); 115 } 116 ByteArrayEntity entity = new ByteArrayEntity(bytes); 117 httpMethod.setEntity(entity); 118 119 HttpClient client = null; 120 HttpResponse answer = null; 121 try { 122 client = getSendHttpClient(); 123 answer = client.execute(httpMethod); 124 int status = answer.getStatusLine().getStatusCode(); 125 if (status != HttpStatus.SC_OK) { 126 throw new IOException("Failed to post command: " + command + " as response was: " + answer); 127 } 128 if (command instanceof ShutdownInfo) { 129 try { 130 stop(); 131 } catch (Exception e) { 132 LOG.warn("Error trying to stop HTTP client: "+ e, e); 133 } 134 } 135 } catch (IOException e) { 136 throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e); 137 } finally { 138 if (answer != null) { 139 EntityUtils.consume(answer.getEntity()); 140 } 141 } 142 } 143 144 @Override 145 public Object request(Object command) throws IOException { 146 return null; 147 } 148 149 private DataInputStream createDataInputStream(HttpResponse answer) throws IOException { 150 Header encoding = answer.getEntity().getContentEncoding(); 151 if (encoding != null && "gzip".equalsIgnoreCase(encoding.getValue())) { 152 return new DataInputStream(new GZIPInputStream(answer.getEntity().getContent())); 153 } else { 154 return new DataInputStream(answer.getEntity().getContent()); 155 } 156 } 157 158 @Override 159 public void run() { 160 161 if (LOG.isTraceEnabled()) { 162 LOG.trace("HTTP GET consumer thread starting: " + this); 163 } 164 HttpClient httpClient = getReceiveHttpClient(); 165 URI remoteUrl = getRemoteUrl(); 166 167 while (!isStopped() && !isStopping()) { 168 169 httpMethod = new HttpGet(remoteUrl.toString()); 170 configureMethod(httpMethod); 171 HttpResponse answer = null; 172 173 try { 174 answer = httpClient.execute(httpMethod); 175 int status = answer.getStatusLine().getStatusCode(); 176 if (status != HttpStatus.SC_OK) { 177 if (status == HttpStatus.SC_REQUEST_TIMEOUT) { 178 LOG.debug("GET timed out"); 179 try { 180 Thread.sleep(1000); 181 } catch (InterruptedException e) { 182 onException(new InterruptedIOException()); 183 Thread.currentThread().interrupt(); 184 break; 185 } 186 } else { 187 onException(new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer)); 188 break; 189 } 190 } else { 191 receiveCounter++; 192 DataInputStream stream = createDataInputStream(answer); 193 Object command = getTextWireFormat().unmarshal(stream); 194 if (command == null) { 195 LOG.debug("Received null command from url: " + remoteUrl); 196 } else { 197 doConsume(command); 198 } 199 stream.close(); 200 } 201 } catch (IOException e) { 202 onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e)); 203 break; 204 } finally { 205 if (answer != null) { 206 try { 207 EntityUtils.consume(answer.getEntity()); 208 } catch (IOException e) { 209 } 210 } 211 } 212 } 213 } 214 215 // Properties 216 // ------------------------------------------------------------------------- 217 public HttpClient getSendHttpClient() { 218 if (sendHttpClient == null) { 219 sendHttpClient = createHttpClient(); 220 } 221 return sendHttpClient; 222 } 223 224 public void setSendHttpClient(HttpClient sendHttpClient) { 225 this.sendHttpClient = sendHttpClient; 226 } 227 228 public HttpClient getReceiveHttpClient() { 229 if (receiveHttpClient == null) { 230 receiveHttpClient = createHttpClient(); 231 } 232 return receiveHttpClient; 233 } 234 235 public void setReceiveHttpClient(HttpClient receiveHttpClient) { 236 this.receiveHttpClient = receiveHttpClient; 237 } 238 239 // Implementation methods 240 // ------------------------------------------------------------------------- 241 @Override 242 protected void doStart() throws Exception { 243 244 if (LOG.isTraceEnabled()) { 245 LOG.trace("HTTP GET consumer thread starting: " + this); 246 } 247 HttpClient httpClient = getReceiveHttpClient(); 248 URI remoteUrl = getRemoteUrl(); 249 250 HttpHead httpMethod = new HttpHead(remoteUrl.toString()); 251 configureMethod(httpMethod); 252 253 // Request the options from the server so we can find out if the broker we are 254 // talking to supports GZip compressed content. If so and useCompression is on 255 // then we can compress our POST data, otherwise we must send it uncompressed to 256 // ensure backwards compatibility. 257 HttpOptions optionsMethod = new HttpOptions(remoteUrl.toString()); 258 ResponseHandler<String> handler = new BasicResponseHandler() { 259 @Override 260 public String handleResponse(HttpResponse response) throws HttpResponseException, IOException { 261 262 for(Header header : response.getAllHeaders()) { 263 if (header.getName().equals("Accepts-Encoding") && header.getValue().contains("gzip")) { 264 LOG.info("Broker Servlet supports GZip compression."); 265 canSendCompressed = true; 266 break; 267 } 268 } 269 270 return super.handleResponse(response); 271 } 272 }; 273 274 try { 275 httpClient.execute(httpMethod, new BasicResponseHandler()); 276 httpClient.execute(optionsMethod, handler); 277 } catch(Exception e) { 278 throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + e.getMessage()); 279 } 280 281 super.doStart(); 282 } 283 284 @Override 285 protected void doStop(ServiceStopper stopper) throws Exception { 286 if (httpMethod != null) { 287 // In some versions of the JVM a race between the httpMethod and the completion 288 // of the method when using HTTPS can lead to a deadlock. This hack attempts to 289 // detect that and interrupt the thread that's locked so that they can complete 290 // on another attempt. 291 for (int i = 0; i < 3; ++i) { 292 Thread abortThread = new Thread(new Runnable() { 293 294 @Override 295 public void run() { 296 try { 297 httpMethod.abort(); 298 } catch (Exception e) { 299 } 300 } 301 }); 302 303 abortThread.start(); 304 abortThread.join(2000); 305 if (abortThread.isAlive() && !httpMethod.isAborted()) { 306 abortThread.interrupt(); 307 } 308 } 309 } 310 } 311 312 protected HttpClient createHttpClient() { 313 DefaultHttpClient client = new DefaultHttpClient(createClientConnectionManager()); 314 if (useCompression) { 315 client.addRequestInterceptor( new HttpRequestInterceptor() { 316 @Override 317 public void process(HttpRequest request, HttpContext context) { 318 // We expect to received a compression response that we un-gzip 319 request.addHeader("Accept-Encoding", "gzip"); 320 } 321 }); 322 } 323 if (getProxyHost() != null) { 324 HttpHost proxy = new HttpHost(getProxyHost(), getProxyPort()); 325 client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy); 326 327 if(getProxyUser() != null && getProxyPassword() != null) { 328 client.getCredentialsProvider().setCredentials( 329 new AuthScope(getProxyHost(), getProxyPort()), 330 new UsernamePasswordCredentials(getProxyUser(), getProxyPassword())); 331 } 332 } 333 334 HttpParams params = client.getParams(); 335 HttpConnectionParams.setSoTimeout(params, soTimeout); 336 337 return client; 338 } 339 340 protected ClientConnectionManager createClientConnectionManager() { 341 return new PoolingClientConnectionManager(); 342 } 343 344 protected void configureMethod(AbstractHttpMessage method) { 345 method.setHeader("clientID", clientID); 346 } 347 348 public boolean isTrace() { 349 return trace; 350 } 351 352 public void setTrace(boolean trace) { 353 this.trace = trace; 354 } 355 356 @Override 357 public int getReceiveCounter() { 358 return receiveCounter; 359 } 360 361 public int getSoTimeout() { 362 return soTimeout; 363 } 364 365 public void setSoTimeout(int soTimeout) { 366 this.soTimeout = soTimeout; 367 } 368 369 public void setUseCompression(boolean useCompression) { 370 this.useCompression = useCompression; 371 } 372 373 public boolean isUseCompression() { 374 return this.useCompression; 375 } 376 377 public int getMinSendAsCompressedSize() { 378 return minSendAsCompressedSize; 379 } 380 381 /** 382 * Sets the minimum size that must be exceeded on a send before compression is used if 383 * the useCompression option is specified. For very small payloads compression can be 384 * inefficient compared to the transmission size savings. 385 * 386 * Default value is 0. 387 * 388 * @param minSendAsCompressedSize 389 */ 390 public void setMinSendAsCompressedSize(int minSendAsCompressedSize) { 391 this.minSendAsCompressedSize = minSendAsCompressedSize; 392 } 393 394}