001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.http;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.net.URI;
023import java.util.zip.GZIPInputStream;
024import java.util.zip.GZIPOutputStream;
025
026import org.apache.activemq.command.ShutdownInfo;
027import org.apache.activemq.transport.FutureResponse;
028import org.apache.activemq.transport.util.TextWireFormat;
029import org.apache.activemq.util.ByteArrayOutputStream;
030import org.apache.activemq.util.IOExceptionSupport;
031import org.apache.activemq.util.IdGenerator;
032import org.apache.activemq.util.ServiceStopper;
033import org.apache.http.Header;
034import org.apache.http.HttpHost;
035import org.apache.http.HttpRequest;
036import org.apache.http.HttpRequestInterceptor;
037import org.apache.http.HttpResponse;
038import org.apache.http.HttpStatus;
039import org.apache.http.auth.AuthScope;
040import org.apache.http.auth.UsernamePasswordCredentials;
041import org.apache.http.client.HttpClient;
042import org.apache.http.client.HttpResponseException;
043import org.apache.http.client.ResponseHandler;
044import org.apache.http.client.methods.HttpGet;
045import org.apache.http.client.methods.HttpHead;
046import org.apache.http.client.methods.HttpOptions;
047import org.apache.http.client.methods.HttpPost;
048import org.apache.http.client.params.CookiePolicy;
049import org.apache.http.client.params.HttpClientParams;
050import org.apache.http.conn.ClientConnectionManager;
051import org.apache.http.conn.params.ConnRoutePNames;
052import org.apache.http.entity.ByteArrayEntity;
053import org.apache.http.impl.client.BasicResponseHandler;
054import org.apache.http.impl.client.DefaultHttpClient;
055import org.apache.http.impl.conn.PoolingClientConnectionManager;
056import org.apache.http.message.AbstractHttpMessage;
057import org.apache.http.params.HttpConnectionParams;
058import org.apache.http.params.HttpParams;
059import org.apache.http.protocol.HttpContext;
060import org.apache.http.util.EntityUtils;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * A HTTP {@link org.apache.activemq.transport.Transport} which uses the
066 * <a href="http://hc.apache.org/index.html">Apache HTTP Client</a>
067 * library
068 */
069public class HttpClientTransport extends HttpTransportSupport {
070
071    public static final int MAX_CLIENT_TIMEOUT = 30000;
072    private static final Logger LOG = LoggerFactory.getLogger(HttpClientTransport.class);
073    private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator();
074
075    private HttpClient sendHttpClient;
076    private HttpClient receiveHttpClient;
077
078    private final String clientID = CLIENT_ID_GENERATOR.generateId();
079    private boolean trace;
080    private HttpGet httpMethod;
081    private volatile int receiveCounter;
082
083    private int soTimeout = MAX_CLIENT_TIMEOUT;
084
085    private boolean useCompression = false;
086    protected boolean canSendCompressed = false;
087    private int minSendAsCompressedSize = 0;
088
089    public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
090        super(wireFormat, remoteUrl);
091    }
092
093    public FutureResponse asyncRequest(Object command) throws IOException {
094        return null;
095    }
096
097    @Override
098    public void oneway(Object command) throws IOException {
099
100        if (isStopped()) {
101            throw new IOException("stopped.");
102        }
103        HttpPost httpMethod = new HttpPost(getRemoteUrl().toString());
104        configureMethod(httpMethod);
105        String data = getTextWireFormat().marshalText(command);
106        byte[] bytes = data.getBytes("UTF-8");
107        if (useCompression && canSendCompressed && bytes.length > minSendAsCompressedSize) {
108            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
109            GZIPOutputStream stream = new GZIPOutputStream(bytesOut);
110            stream.write(bytes);
111            stream.close();
112            httpMethod.addHeader("Content-Type", "application/x-gzip");
113            if (LOG.isTraceEnabled()) {
114                LOG.trace("Sending compressed, size = " + bytes.length + ", compressed size = " + bytesOut.size());
115            }
116            bytes = bytesOut.toByteArray();
117        }
118        ByteArrayEntity entity = new ByteArrayEntity(bytes);
119        httpMethod.setEntity(entity);
120
121        HttpClient client = null;
122        HttpResponse answer = null;
123        try {
124            client = getSendHttpClient();
125            answer = client.execute(httpMethod);
126            int status = answer.getStatusLine().getStatusCode();
127            if (status != HttpStatus.SC_OK) {
128                throw new IOException("Failed to post command: " + command + " as response was: " + answer);
129            }
130            if (command instanceof ShutdownInfo) {
131                try {
132                    stop();
133                } catch (Exception e) {
134                    LOG.warn("Error trying to stop HTTP client: "+ e, e);
135                }
136            }
137        } catch (IOException e) {
138            throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e);
139        } finally {
140            if (answer != null) {
141                EntityUtils.consume(answer.getEntity());
142            }
143        }
144    }
145
146    @Override
147    public Object request(Object command) throws IOException {
148        return null;
149    }
150
151    private DataInputStream createDataInputStream(HttpResponse answer) throws IOException {
152        Header encoding = answer.getEntity().getContentEncoding();
153        if (encoding != null && "gzip".equalsIgnoreCase(encoding.getValue())) {
154            return new DataInputStream(new GZIPInputStream(answer.getEntity().getContent()));
155        } else {
156            return new DataInputStream(answer.getEntity().getContent());
157        }
158    }
159
160    @Override
161    public void run() {
162
163        if (LOG.isTraceEnabled()) {
164            LOG.trace("HTTP GET consumer thread starting: " + this);
165        }
166        HttpClient httpClient = getReceiveHttpClient();
167        URI remoteUrl = getRemoteUrl();
168
169        while (!isStopped() && !isStopping()) {
170
171            httpMethod = new HttpGet(remoteUrl.toString());
172            configureMethod(httpMethod);
173            HttpResponse answer = null;
174
175            try {
176                answer = httpClient.execute(httpMethod);
177                int status = answer.getStatusLine().getStatusCode();
178                if (status != HttpStatus.SC_OK) {
179                    if (status == HttpStatus.SC_REQUEST_TIMEOUT) {
180                        LOG.debug("GET timed out");
181                        try {
182                            Thread.sleep(1000);
183                        } catch (InterruptedException e) {
184                            onException(new InterruptedIOException());
185                            Thread.currentThread().interrupt();
186                            break;
187                        }
188                    } else {
189                        onException(new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer));
190                        break;
191                    }
192                } else {
193                    receiveCounter++;
194                    DataInputStream stream = createDataInputStream(answer);
195                    Object command = getTextWireFormat().unmarshal(stream);
196                    if (command == null) {
197                        LOG.debug("Received null command from url: " + remoteUrl);
198                    } else {
199                        doConsume(command);
200                    }
201                    stream.close();
202                }
203            } catch (IOException e) {
204                onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e));
205                break;
206            } finally {
207                if (answer != null) {
208                    try {
209                        EntityUtils.consume(answer.getEntity());
210                    } catch (IOException e) {
211                    }
212                }
213            }
214        }
215    }
216
217    // Properties
218    // -------------------------------------------------------------------------
219    public HttpClient getSendHttpClient() {
220        if (sendHttpClient == null) {
221            sendHttpClient = createHttpClient();
222        }
223        return sendHttpClient;
224    }
225
226    public void setSendHttpClient(HttpClient sendHttpClient) {
227        this.sendHttpClient = sendHttpClient;
228    }
229
230    public HttpClient getReceiveHttpClient() {
231        if (receiveHttpClient == null) {
232            receiveHttpClient = createHttpClient();
233        }
234        return receiveHttpClient;
235    }
236
237    public void setReceiveHttpClient(HttpClient receiveHttpClient) {
238        this.receiveHttpClient = receiveHttpClient;
239    }
240
241    // Implementation methods
242    // -------------------------------------------------------------------------
243    @Override
244    protected void doStart() throws Exception {
245
246        if (LOG.isTraceEnabled()) {
247            LOG.trace("HTTP GET consumer thread starting: " + this);
248        }
249        HttpClient httpClient = getReceiveHttpClient();
250        URI remoteUrl = getRemoteUrl();
251
252        HttpHead httpMethod = new HttpHead(remoteUrl.toString());
253        configureMethod(httpMethod);
254
255        // Request the options from the server so we can find out if the broker we are
256        // talking to supports GZip compressed content.  If so and useCompression is on
257        // then we can compress our POST data, otherwise we must send it uncompressed to
258        // ensure backwards compatibility.
259        HttpOptions optionsMethod = new HttpOptions(remoteUrl.toString());
260        ResponseHandler<String> handler = new BasicResponseHandler() {
261            @Override
262            public String handleResponse(HttpResponse response) throws HttpResponseException, IOException {
263
264                for(Header header : response.getAllHeaders()) {
265                    if (header.getName().equals("Accepts-Encoding") && header.getValue().contains("gzip")) {
266                        LOG.info("Broker Servlet supports GZip compression.");
267                        canSendCompressed = true;
268                        break;
269                    }
270                }
271
272                return super.handleResponse(response);
273            }
274        };
275
276        try {
277            httpClient.execute(httpMethod, new BasicResponseHandler());
278            httpClient.execute(optionsMethod, handler);
279        } catch(Exception e) {
280            throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + e.getMessage());
281        }
282
283        super.doStart();
284    }
285
286    @Override
287    protected void doStop(ServiceStopper stopper) throws Exception {
288        if (httpMethod != null) {
289            // In some versions of the JVM a race between the httpMethod and the completion
290            // of the method when using HTTPS can lead to a deadlock.  This hack attempts to
291            // detect that and interrupt the thread that's locked so that they can complete
292            // on another attempt.
293            for (int i = 0; i < 3; ++i) {
294                Thread abortThread = new Thread(new Runnable() {
295
296                    @Override
297                    public void run() {
298                        try {
299                            httpMethod.abort();
300                        } catch (Exception e) {
301                        }
302                    }
303                });
304
305                abortThread.start();
306                abortThread.join(2000);
307                if (abortThread.isAlive() && !httpMethod.isAborted()) {
308                    abortThread.interrupt();
309                }
310            }
311        }
312    }
313
314    protected HttpClient createHttpClient() {
315        DefaultHttpClient client = new DefaultHttpClient(createClientConnectionManager());
316        if (useCompression) {
317            client.addRequestInterceptor( new HttpRequestInterceptor() {
318                @Override
319                public void process(HttpRequest request, HttpContext context) {
320                    // We expect to received a compression response that we un-gzip
321                    request.addHeader("Accept-Encoding", "gzip");
322                }
323            });
324        }
325        if (getProxyHost() != null) {
326            HttpHost proxy = new HttpHost(getProxyHost(), getProxyPort());
327            client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
328
329            if(getProxyUser() != null && getProxyPassword() != null) {
330                client.getCredentialsProvider().setCredentials(
331                    new AuthScope(getProxyHost(), getProxyPort()),
332                    new UsernamePasswordCredentials(getProxyUser(), getProxyPassword()));
333            }
334        }
335
336        HttpParams params = client.getParams();
337        HttpConnectionParams.setSoTimeout(params, soTimeout);
338        HttpClientParams.setCookiePolicy(params, CookiePolicy.BROWSER_COMPATIBILITY);
339
340        return client;
341    }
342
343    protected ClientConnectionManager createClientConnectionManager() {
344        return new PoolingClientConnectionManager();
345    }
346
347    protected void configureMethod(AbstractHttpMessage method) {
348        method.setHeader("clientID", clientID);
349    }
350
351    public boolean isTrace() {
352        return trace;
353    }
354
355    public void setTrace(boolean trace) {
356        this.trace = trace;
357    }
358
359    @Override
360    public int getReceiveCounter() {
361        return receiveCounter;
362    }
363
364    public int getSoTimeout() {
365        return soTimeout;
366    }
367
368    public void setSoTimeout(int soTimeout) {
369        this.soTimeout = soTimeout;
370    }
371
372    public void setUseCompression(boolean useCompression) {
373        this.useCompression = useCompression;
374    }
375
376    public boolean isUseCompression() {
377        return this.useCompression;
378    }
379
380    public int getMinSendAsCompressedSize() {
381        return minSendAsCompressedSize;
382    }
383
384    /**
385     * Sets the minimum size that must be exceeded on a send before compression is used if
386     * the useCompression option is specified.  For very small payloads compression can be
387     * inefficient compared to the transmission size savings.
388     *
389     * Default value is 0.
390     *
391     * @param minSendAsCompressedSize
392     */
393    public void setMinSendAsCompressedSize(int minSendAsCompressedSize) {
394        this.minSendAsCompressedSize = minSendAsCompressedSize;
395    }
396
397}