001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.reliable; 018 019import java.io.IOException; 020import java.util.SortedSet; 021import java.util.TreeSet; 022 023import org.apache.activemq.command.Command; 024import org.apache.activemq.command.ReplayCommand; 025import org.apache.activemq.command.Response; 026import org.apache.activemq.openwire.CommandIdComparator; 027import org.apache.activemq.transport.FutureResponse; 028import org.apache.activemq.transport.ResponseCorrelator; 029import org.apache.activemq.transport.Transport; 030import org.apache.activemq.transport.udp.UdpTransport; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * This interceptor deals with out of order commands together with being able to 036 * handle dropped commands and the re-requesting dropped commands. 037 * 038 * @deprecated 039 */ 040@Deprecated 041public class ReliableTransport extends ResponseCorrelator { 042 private static final Logger LOG = LoggerFactory.getLogger(ReliableTransport.class); 043 044 private ReplayStrategy replayStrategy; 045 private final SortedSet<Command> commands = new TreeSet<Command>(new CommandIdComparator()); 046 private int expectedCounter = 1; 047 private int replayBufferCommandCount = 50; 048 private int requestTimeout = 2000; 049 private ReplayBuffer replayBuffer; 050 private Replayer replayer; 051 private UdpTransport udpTransport; 052 053 public ReliableTransport(Transport next, ReplayStrategy replayStrategy) { 054 super(next); 055 this.replayStrategy = replayStrategy; 056 } 057 058 public ReliableTransport(Transport next, UdpTransport udpTransport) throws IOException { 059 super(next, udpTransport.getSequenceGenerator()); 060 this.udpTransport = udpTransport; 061 this.replayer = udpTransport.createReplayer(); 062 } 063 064 /** 065 * Requests that a range of commands be replayed 066 */ 067 public void requestReplay(int fromCommandId, int toCommandId) { 068 ReplayCommand replay = new ReplayCommand(); 069 replay.setFirstNakNumber(fromCommandId); 070 replay.setLastNakNumber(toCommandId); 071 try { 072 oneway(replay); 073 } catch (IOException e) { 074 getTransportListener().onException(e); 075 } 076 } 077 078 @Override 079 public Object request(Object o) throws IOException { 080 final Command command = (Command)o; 081 FutureResponse response = asyncRequest(command, null); 082 while (true) { 083 Response result = response.getResult(requestTimeout); 084 if (result != null) { 085 return result; 086 } 087 onMissingResponse(command, response); 088 } 089 } 090 091 @Override 092 public Object request(Object o, int timeout) throws IOException { 093 final Command command = (Command)o; 094 FutureResponse response = asyncRequest(command, null); 095 while (timeout > 0) { 096 int time = timeout; 097 if (timeout > requestTimeout) { 098 time = requestTimeout; 099 } 100 Response result = response.getResult(time); 101 if (result != null) { 102 return result; 103 } 104 onMissingResponse(command, response); 105 timeout -= time; 106 } 107 return response.getResult(0); 108 } 109 110 @Override 111 public void onCommand(Object o) { 112 Command command = (Command)o; 113 // lets pass wireformat through 114 if (command.isWireFormatInfo()) { 115 super.onCommand(command); 116 return; 117 } else if (command.getDataStructureType() == ReplayCommand.DATA_STRUCTURE_TYPE) { 118 replayCommands((ReplayCommand)command); 119 return; 120 } 121 122 int actualCounter = command.getCommandId(); 123 boolean valid = expectedCounter == actualCounter; 124 125 if (!valid) { 126 synchronized (commands) { 127 int nextCounter = actualCounter; 128 boolean empty = commands.isEmpty(); 129 if (!empty) { 130 Command nextAvailable = commands.first(); 131 nextCounter = nextAvailable.getCommandId(); 132 } 133 134 try { 135 boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter, nextCounter); 136 137 if (keep) { 138 // lets add it to the list for later on 139 if (LOG.isDebugEnabled()) { 140 LOG.debug("Received out of order command which is being buffered for later: " + command); 141 } 142 commands.add(command); 143 } 144 } catch (IOException e) { 145 onException(e); 146 } 147 148 if (!empty) { 149 // lets see if the first item in the set is the next 150 // expected 151 command = commands.first(); 152 valid = expectedCounter == command.getCommandId(); 153 if (valid) { 154 commands.remove(command); 155 } 156 } 157 } 158 } 159 160 while (valid) { 161 // we've got a valid header so increment counter 162 replayStrategy.onReceivedPacket(this, expectedCounter); 163 expectedCounter++; 164 super.onCommand(command); 165 166 synchronized (commands) { 167 // we could have more commands left 168 valid = !commands.isEmpty(); 169 if (valid) { 170 // lets see if the first item in the set is the next 171 // expected 172 command = commands.first(); 173 valid = expectedCounter == command.getCommandId(); 174 if (valid) { 175 commands.remove(command); 176 } 177 } 178 } 179 } 180 } 181 182 public int getBufferedCommandCount() { 183 synchronized (commands) { 184 return commands.size(); 185 } 186 } 187 188 public int getExpectedCounter() { 189 return expectedCounter; 190 } 191 192 /** 193 * This property should never really be set - but is mutable primarily for 194 * test cases 195 */ 196 public void setExpectedCounter(int expectedCounter) { 197 this.expectedCounter = expectedCounter; 198 } 199 200 public int getRequestTimeout() { 201 return requestTimeout; 202 } 203 204 /** 205 * Sets the default timeout of requests before starting to request commands 206 * are replayed 207 */ 208 public void setRequestTimeout(int requestTimeout) { 209 this.requestTimeout = requestTimeout; 210 } 211 212 public ReplayStrategy getReplayStrategy() { 213 return replayStrategy; 214 } 215 216 public ReplayBuffer getReplayBuffer() { 217 if (replayBuffer == null) { 218 replayBuffer = createReplayBuffer(); 219 } 220 return replayBuffer; 221 } 222 223 public void setReplayBuffer(ReplayBuffer replayBuffer) { 224 this.replayBuffer = replayBuffer; 225 } 226 227 public int getReplayBufferCommandCount() { 228 return replayBufferCommandCount; 229 } 230 231 /** 232 * Sets the default number of commands which are buffered 233 */ 234 public void setReplayBufferCommandCount(int replayBufferSize) { 235 this.replayBufferCommandCount = replayBufferSize; 236 } 237 238 public void setReplayStrategy(ReplayStrategy replayStrategy) { 239 this.replayStrategy = replayStrategy; 240 } 241 242 public Replayer getReplayer() { 243 return replayer; 244 } 245 246 public void setReplayer(Replayer replayer) { 247 this.replayer = replayer; 248 } 249 250 @Override 251 public String toString() { 252 return next.toString(); 253 } 254 255 @Override 256 public void start() throws Exception { 257 if (udpTransport != null) { 258 udpTransport.setReplayBuffer(getReplayBuffer()); 259 } 260 if (replayStrategy == null) { 261 throw new IllegalArgumentException("Property replayStrategy not specified"); 262 } 263 super.start(); 264 } 265 266 /** 267 * Lets attempt to replay the request as a command may have disappeared 268 */ 269 protected void onMissingResponse(Command command, FutureResponse response) { 270 LOG.debug("Still waiting for response on: " + this + " to command: " + command + " sending replay message"); 271 272 int commandId = command.getCommandId(); 273 requestReplay(commandId, commandId); 274 } 275 276 protected ReplayBuffer createReplayBuffer() { 277 return new DefaultReplayBuffer(getReplayBufferCommandCount()); 278 } 279 280 protected void replayCommands(ReplayCommand command) { 281 try { 282 if (replayer == null) { 283 onException(new IOException("Cannot replay commands. No replayer property configured")); 284 } 285 if (LOG.isDebugEnabled()) { 286 LOG.debug("Processing replay command: " + command); 287 } 288 getReplayBuffer().replayMessages(command.getFirstNakNumber(), command.getLastNakNumber(), replayer); 289 290 // TODO we could proactively remove ack'd stuff from the replay 291 // buffer 292 // if we only have a single client talking to us 293 } catch (IOException e) { 294 onException(e); 295 } 296 } 297 298}