001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.HashMap; 022import java.util.Iterator; 023import java.util.Map; 024 025import org.apache.activemq.command.Command; 026import org.apache.activemq.command.ExceptionResponse; 027import org.apache.activemq.command.Response; 028import org.apache.activemq.util.IntSequenceGenerator; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * Adds the incrementing sequence number to commands along with performing the 034 * correlation of responses to requests to create a blocking request-response 035 * semantics. 036 * 037 * 038 */ 039public class ResponseCorrelator extends TransportFilter { 040 041 private static final Logger LOG = LoggerFactory.getLogger(ResponseCorrelator.class); 042 private final Map<Integer, FutureResponse> requestMap = new HashMap<Integer, FutureResponse>(); 043 private IntSequenceGenerator sequenceGenerator; 044 private final boolean debug = LOG.isDebugEnabled(); 045 private IOException error; 046 047 public ResponseCorrelator(Transport next) { 048 this(next, new IntSequenceGenerator()); 049 } 050 051 public ResponseCorrelator(Transport next, IntSequenceGenerator sequenceGenerator) { 052 super(next); 053 this.sequenceGenerator = sequenceGenerator; 054 } 055 056 public void oneway(Object o) throws IOException { 057 Command command = (Command)o; 058 command.setCommandId(sequenceGenerator.getNextSequenceId()); 059 command.setResponseRequired(false); 060 next.oneway(command); 061 } 062 063 public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException { 064 Command command = (Command) o; 065 command.setCommandId(sequenceGenerator.getNextSequenceId()); 066 command.setResponseRequired(true); 067 FutureResponse future = new FutureResponse(responseCallback, this); 068 IOException priorError = null; 069 synchronized (requestMap) { 070 priorError = this.error; 071 if (priorError == null) { 072 requestMap.put(new Integer(command.getCommandId()), future); 073 } 074 } 075 076 if (priorError != null) { 077 future.set(new ExceptionResponse(priorError)); 078 throw priorError; 079 } 080 081 next.oneway(command); 082 return future; 083 } 084 085 public Object request(Object command) throws IOException { 086 FutureResponse response = asyncRequest(command, null); 087 return response.getResult(); 088 } 089 090 public Object request(Object command, int timeout) throws IOException { 091 FutureResponse response = asyncRequest(command, null); 092 return response.getResult(timeout); 093 } 094 095 public void onCommand(Object o) { 096 Command command = null; 097 if (o instanceof Command) { 098 command = (Command)o; 099 } else { 100 throw new ClassCastException("Object cannot be converted to a Command, Object: " + o); 101 } 102 if (command.isResponse()) { 103 Response response = (Response)command; 104 FutureResponse future = null; 105 synchronized (requestMap) { 106 future = requestMap.remove(Integer.valueOf(response.getCorrelationId())); 107 } 108 if (future != null) { 109 future.set(response); 110 } else { 111 if (debug) { 112 LOG.debug("Received unexpected response: {" + command + "}for command id: " + response.getCorrelationId()); 113 } 114 } 115 } else { 116 getTransportListener().onCommand(command); 117 } 118 } 119 120 /** 121 * If an async exception occurs, then assume no responses will arrive for 122 * any of current requests. Lets let them know of the problem. 123 */ 124 public void onException(IOException error) { 125 dispose(new TransportDisposedIOException("Disposed due to prior exception", error)); 126 super.onException(error); 127 } 128 129 @Override 130 public void stop() throws Exception { 131 dispose(new IOException("Stopped.")); 132 super.stop(); 133 } 134 135 private void dispose(IOException error) { 136 ArrayList<FutureResponse> requests=null; 137 synchronized(requestMap) { 138 if( this.error==null) { 139 this.error = error; 140 requests = new ArrayList<FutureResponse>(requestMap.values()); 141 requestMap.clear(); 142 } 143 } 144 if( requests!=null ) { 145 for (Iterator<FutureResponse> iter = requests.iterator(); iter.hasNext();) { 146 FutureResponse fr = iter.next(); 147 fr.set(new ExceptionResponse(error)); 148 } 149 } 150 } 151 152 public IntSequenceGenerator getSequenceGenerator() { 153 return sequenceGenerator; 154 } 155 156 public String toString() { 157 return next.toString(); 158 } 159}