001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.http;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.net.URI;
023import java.util.zip.GZIPInputStream;
024import java.util.zip.GZIPOutputStream;
025
026import org.apache.activemq.command.ShutdownInfo;
027import org.apache.activemq.transport.FutureResponse;
028import org.apache.activemq.transport.util.TextWireFormat;
029import org.apache.activemq.util.ByteArrayOutputStream;
030import org.apache.activemq.util.IOExceptionSupport;
031import org.apache.activemq.util.IdGenerator;
032import org.apache.activemq.util.ServiceStopper;
033import org.apache.http.Header;
034import org.apache.http.HttpHost;
035import org.apache.http.HttpRequest;
036import org.apache.http.HttpRequestInterceptor;
037import org.apache.http.HttpResponse;
038import org.apache.http.HttpStatus;
039import org.apache.http.auth.AuthScope;
040import org.apache.http.auth.UsernamePasswordCredentials;
041import org.apache.http.client.HttpClient;
042import org.apache.http.client.HttpResponseException;
043import org.apache.http.client.ResponseHandler;
044import org.apache.http.client.methods.HttpGet;
045import org.apache.http.client.methods.HttpHead;
046import org.apache.http.client.methods.HttpOptions;
047import org.apache.http.client.methods.HttpPost;
048import org.apache.http.conn.ClientConnectionManager;
049import org.apache.http.conn.params.ConnRoutePNames;
050import org.apache.http.entity.ByteArrayEntity;
051import org.apache.http.impl.client.BasicResponseHandler;
052import org.apache.http.impl.client.DefaultHttpClient;
053import org.apache.http.impl.conn.PoolingClientConnectionManager;
054import org.apache.http.message.AbstractHttpMessage;
055import org.apache.http.params.HttpConnectionParams;
056import org.apache.http.params.HttpParams;
057import org.apache.http.protocol.HttpContext;
058import org.apache.http.util.EntityUtils;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * A HTTP {@link org.apache.activemq.transport.Transport} which uses the
064 * <a href="http://hc.apache.org/index.html">Apache HTTP Client</a>
065 * library
066 */
067public class HttpClientTransport extends HttpTransportSupport {
068
069    public static final int MAX_CLIENT_TIMEOUT = 30000;
070    private static final Logger LOG = LoggerFactory.getLogger(HttpClientTransport.class);
071    private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator();
072
073    private HttpClient sendHttpClient;
074    private HttpClient receiveHttpClient;
075
076    private final String clientID = CLIENT_ID_GENERATOR.generateId();
077    private boolean trace;
078    private HttpGet httpMethod;
079    private volatile int receiveCounter;
080
081    private int soTimeout = MAX_CLIENT_TIMEOUT;
082
083    private boolean useCompression = false;
084    protected boolean canSendCompressed = false;
085    private int minSendAsCompressedSize = 0;
086
087    public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
088        super(wireFormat, remoteUrl);
089    }
090
091    public FutureResponse asyncRequest(Object command) throws IOException {
092        return null;
093    }
094
095    @Override
096    public void oneway(Object command) throws IOException {
097
098        if (isStopped()) {
099            throw new IOException("stopped.");
100        }
101        HttpPost httpMethod = new HttpPost(getRemoteUrl().toString());
102        configureMethod(httpMethod);
103        String data = getTextWireFormat().marshalText(command);
104        byte[] bytes = data.getBytes("UTF-8");
105        if (useCompression && canSendCompressed && bytes.length > minSendAsCompressedSize) {
106            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
107            GZIPOutputStream stream = new GZIPOutputStream(bytesOut);
108            stream.write(bytes);
109            stream.close();
110            httpMethod.addHeader("Content-Type", "application/x-gzip");
111            if (LOG.isTraceEnabled()) {
112                LOG.trace("Sending compressed, size = " + bytes.length + ", compressed size = " + bytesOut.size());
113            }
114            bytes = bytesOut.toByteArray();
115        }
116        ByteArrayEntity entity = new ByteArrayEntity(bytes);
117        httpMethod.setEntity(entity);
118
119        HttpClient client = null;
120        HttpResponse answer = null;
121        try {
122            client = getSendHttpClient();
123            answer = client.execute(httpMethod);
124            int status = answer.getStatusLine().getStatusCode();
125            if (status != HttpStatus.SC_OK) {
126                throw new IOException("Failed to post command: " + command + " as response was: " + answer);
127            }
128            if (command instanceof ShutdownInfo) {
129                try {
130                    stop();
131                } catch (Exception e) {
132                    LOG.warn("Error trying to stop HTTP client: "+ e, e);
133                }
134            }
135        } catch (IOException e) {
136            throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e);
137        } finally {
138            if (answer != null) {
139                EntityUtils.consume(answer.getEntity());
140            }
141        }
142    }
143
144    @Override
145    public Object request(Object command) throws IOException {
146        return null;
147    }
148
149    private DataInputStream createDataInputStream(HttpResponse answer) throws IOException {
150        Header encoding = answer.getEntity().getContentEncoding();
151        if (encoding != null && "gzip".equalsIgnoreCase(encoding.getValue())) {
152            return new DataInputStream(new GZIPInputStream(answer.getEntity().getContent()));
153        } else {
154            return new DataInputStream(answer.getEntity().getContent());
155        }
156    }
157
158    @Override
159    public void run() {
160
161        if (LOG.isTraceEnabled()) {
162            LOG.trace("HTTP GET consumer thread starting: " + this);
163        }
164        HttpClient httpClient = getReceiveHttpClient();
165        URI remoteUrl = getRemoteUrl();
166
167        while (!isStopped() && !isStopping()) {
168
169            httpMethod = new HttpGet(remoteUrl.toString());
170            configureMethod(httpMethod);
171            HttpResponse answer = null;
172
173            try {
174                answer = httpClient.execute(httpMethod);
175                int status = answer.getStatusLine().getStatusCode();
176                if (status != HttpStatus.SC_OK) {
177                    if (status == HttpStatus.SC_REQUEST_TIMEOUT) {
178                        LOG.debug("GET timed out");
179                        try {
180                            Thread.sleep(1000);
181                        } catch (InterruptedException e) {
182                            onException(new InterruptedIOException());
183                            Thread.currentThread().interrupt();
184                            break;
185                        }
186                    } else {
187                        onException(new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer));
188                        break;
189                    }
190                } else {
191                    receiveCounter++;
192                    DataInputStream stream = createDataInputStream(answer);
193                    Object command = getTextWireFormat().unmarshal(stream);
194                    if (command == null) {
195                        LOG.debug("Received null command from url: " + remoteUrl);
196                    } else {
197                        doConsume(command);
198                    }
199                    stream.close();
200                }
201            } catch (IOException e) {
202                onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e));
203                break;
204            } finally {
205                if (answer != null) {
206                    try {
207                        EntityUtils.consume(answer.getEntity());
208                    } catch (IOException e) {
209                    }
210                }
211            }
212        }
213    }
214
215    // Properties
216    // -------------------------------------------------------------------------
217    public HttpClient getSendHttpClient() {
218        if (sendHttpClient == null) {
219            sendHttpClient = createHttpClient();
220        }
221        return sendHttpClient;
222    }
223
224    public void setSendHttpClient(HttpClient sendHttpClient) {
225        this.sendHttpClient = sendHttpClient;
226    }
227
228    public HttpClient getReceiveHttpClient() {
229        if (receiveHttpClient == null) {
230            receiveHttpClient = createHttpClient();
231        }
232        return receiveHttpClient;
233    }
234
235    public void setReceiveHttpClient(HttpClient receiveHttpClient) {
236        this.receiveHttpClient = receiveHttpClient;
237    }
238
239    // Implementation methods
240    // -------------------------------------------------------------------------
241    @Override
242    protected void doStart() throws Exception {
243
244        if (LOG.isTraceEnabled()) {
245            LOG.trace("HTTP GET consumer thread starting: " + this);
246        }
247        HttpClient httpClient = getReceiveHttpClient();
248        URI remoteUrl = getRemoteUrl();
249
250        HttpHead httpMethod = new HttpHead(remoteUrl.toString());
251        configureMethod(httpMethod);
252
253        // Request the options from the server so we can find out if the broker we are
254        // talking to supports GZip compressed content.  If so and useCompression is on
255        // then we can compress our POST data, otherwise we must send it uncompressed to
256        // ensure backwards compatibility.
257        HttpOptions optionsMethod = new HttpOptions(remoteUrl.toString());
258        ResponseHandler<String> handler = new BasicResponseHandler() {
259            @Override
260            public String handleResponse(HttpResponse response) throws HttpResponseException, IOException {
261
262                for(Header header : response.getAllHeaders()) {
263                    if (header.getName().equals("Accepts-Encoding") && header.getValue().contains("gzip")) {
264                        LOG.info("Broker Servlet supports GZip compression.");
265                        canSendCompressed = true;
266                        break;
267                    }
268                }
269
270                return super.handleResponse(response);
271            }
272        };
273
274        try {
275            httpClient.execute(httpMethod, new BasicResponseHandler());
276            httpClient.execute(optionsMethod, handler);
277        } catch(Exception e) {
278            throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + e.getMessage());
279        }
280
281        super.doStart();
282    }
283
284    @Override
285    protected void doStop(ServiceStopper stopper) throws Exception {
286        if (httpMethod != null) {
287            // In some versions of the JVM a race between the httpMethod and the completion
288            // of the method when using HTTPS can lead to a deadlock.  This hack attempts to
289            // detect that and interrupt the thread that's locked so that they can complete
290            // on another attempt.
291            for (int i = 0; i < 3; ++i) {
292                Thread abortThread = new Thread(new Runnable() {
293
294                    @Override
295                    public void run() {
296                        try {
297                            httpMethod.abort();
298                        } catch (Exception e) {
299                        }
300                    }
301                });
302
303                abortThread.start();
304                abortThread.join(2000);
305                if (abortThread.isAlive() && !httpMethod.isAborted()) {
306                    abortThread.interrupt();
307                }
308            }
309        }
310    }
311
312    protected HttpClient createHttpClient() {
313        DefaultHttpClient client = new DefaultHttpClient(createClientConnectionManager());
314        if (useCompression) {
315            client.addRequestInterceptor( new HttpRequestInterceptor() {
316                @Override
317                public void process(HttpRequest request, HttpContext context) {
318                    // We expect to received a compression response that we un-gzip
319                    request.addHeader("Accept-Encoding", "gzip");
320                }
321            });
322        }
323        if (getProxyHost() != null) {
324            HttpHost proxy = new HttpHost(getProxyHost(), getProxyPort());
325            client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
326
327            if(getProxyUser() != null && getProxyPassword() != null) {
328                client.getCredentialsProvider().setCredentials(
329                    new AuthScope(getProxyHost(), getProxyPort()),
330                    new UsernamePasswordCredentials(getProxyUser(), getProxyPassword()));
331            }
332        }
333
334        HttpParams params = client.getParams();
335        HttpConnectionParams.setSoTimeout(params, soTimeout);
336
337        return client;
338    }
339
340    protected ClientConnectionManager createClientConnectionManager() {
341        return new PoolingClientConnectionManager();
342    }
343
344    protected void configureMethod(AbstractHttpMessage method) {
345        method.setHeader("clientID", clientID);
346    }
347
348    public boolean isTrace() {
349        return trace;
350    }
351
352    public void setTrace(boolean trace) {
353        this.trace = trace;
354    }
355
356    @Override
357    public int getReceiveCounter() {
358        return receiveCounter;
359    }
360
361    public int getSoTimeout() {
362        return soTimeout;
363    }
364
365    public void setSoTimeout(int soTimeout) {
366        this.soTimeout = soTimeout;
367    }
368
369    public void setUseCompression(boolean useCompression) {
370        this.useCompression = useCompression;
371    }
372
373    public boolean isUseCompression() {
374        return this.useCompression;
375    }
376
377    public int getMinSendAsCompressedSize() {
378        return minSendAsCompressedSize;
379    }
380
381    /**
382     * Sets the minimum size that must be exceeded on a send before compression is used if
383     * the useCompression option is specified.  For very small payloads compression can be
384     * inefficient compared to the transmission size savings.
385     *
386     * Default value is 0.
387     *
388     * @param minSendAsCompressedSize
389     */
390    public void setMinSendAsCompressedSize(int minSendAsCompressedSize) {
391        this.minSendAsCompressedSize = minSendAsCompressedSize;
392    }
393
394}