001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport; 018 019import java.io.IOException; 020import java.io.InterruptedIOException; 021import java.util.concurrent.ArrayBlockingQueue; 022import java.util.concurrent.TimeUnit; 023 024import org.apache.activemq.command.Response; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028public class FutureResponse { 029 private static final Logger LOG = LoggerFactory.getLogger(FutureResponse.class); 030 031 private final ResponseCallback responseCallback; 032 private final TransportFilter transportFilter; 033 034 private final ArrayBlockingQueue<Response> responseSlot = new ArrayBlockingQueue<Response>(1); 035 036 public FutureResponse(ResponseCallback responseCallback) { 037 this(responseCallback, null); 038 } 039 040 public FutureResponse(ResponseCallback responseCallback, TransportFilter transportFilter) { 041 this.responseCallback = responseCallback; 042 this.transportFilter = transportFilter; 043 } 044 045 public Response getResult() throws IOException { 046 boolean hasInterruptPending = Thread.interrupted(); 047 try { 048 return responseSlot.take(); 049 } catch (InterruptedException e) { 050 hasInterruptPending = false; 051 throw dealWithInterrupt(e); 052 } finally { 053 if (hasInterruptPending) { 054 Thread.currentThread().interrupt(); 055 } 056 } 057 } 058 059 private InterruptedIOException dealWithInterrupt(InterruptedException e) { 060 if (LOG.isDebugEnabled()) { 061 LOG.debug("Operation interrupted: " + e, e); 062 } 063 InterruptedIOException interruptedIOException = new InterruptedIOException(e.getMessage()); 064 interruptedIOException.initCause(e); 065 try { 066 if (transportFilter != null) { 067 transportFilter.onException(interruptedIOException); 068 } 069 } finally { 070 Thread.currentThread().interrupt(); 071 } 072 return interruptedIOException; 073 } 074 075 public Response getResult(int timeout) throws IOException { 076 final boolean wasInterrupted = Thread.interrupted(); 077 try { 078 Response result = responseSlot.poll(timeout, TimeUnit.MILLISECONDS); 079 if (result == null && timeout > 0) { 080 throw new RequestTimedOutIOException(); 081 } 082 return result; 083 } catch (InterruptedException e) { 084 throw dealWithInterrupt(e); 085 } finally { 086 if (wasInterrupted) { 087 Thread.currentThread().interrupt(); 088 } 089 } 090 } 091 092 public void set(Response result) { 093 if (responseSlot.offer(result)) { 094 if (responseCallback != null) { 095 responseCallback.onCompletion(this); 096 } 097 } 098 } 099}