001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.state; 018 019import java.io.IOException; 020import java.util.Iterator; 021import java.util.LinkedHashMap; 022import java.util.Map; 023import java.util.Map.Entry; 024import java.util.Vector; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027 028import javax.jms.TransactionRolledBackException; 029import javax.transaction.xa.XAResource; 030 031import org.apache.activemq.command.Command; 032import org.apache.activemq.command.ConnectionId; 033import org.apache.activemq.command.ConnectionInfo; 034import org.apache.activemq.command.ConsumerControl; 035import org.apache.activemq.command.ConsumerId; 036import org.apache.activemq.command.ConsumerInfo; 037import org.apache.activemq.command.DestinationInfo; 038import org.apache.activemq.command.ExceptionResponse; 039import org.apache.activemq.command.IntegerResponse; 040import org.apache.activemq.command.Message; 041import org.apache.activemq.command.MessagePull; 042import org.apache.activemq.command.ProducerId; 043import org.apache.activemq.command.ProducerInfo; 044import org.apache.activemq.command.Response; 045import org.apache.activemq.command.SessionId; 046import org.apache.activemq.command.SessionInfo; 047import org.apache.activemq.command.TransactionInfo; 048import org.apache.activemq.transport.Transport; 049import org.apache.activemq.util.IOExceptionSupport; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Tracks the state of a connection so a newly established transport can be 055 * re-initialized to the state that was tracked. 056 * 057 * 058 */ 059public class ConnectionStateTracker extends CommandVisitorAdapter { 060 private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class); 061 062 private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); 063 private static final int MESSAGE_PULL_SIZE = 400; 064 protected final ConcurrentMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>(); 065 066 private boolean trackTransactions; 067 private boolean restoreSessions = true; 068 private boolean restoreConsumers = true; 069 private boolean restoreProducers = true; 070 private boolean restoreTransaction = true; 071 private boolean trackMessages = true; 072 private boolean trackTransactionProducers = true; 073 private int maxCacheSize = 128 * 1024; 074 private long currentCacheSize; // use long to prevent overflow for folks who set high max. 075 076 private final Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){ 077 @Override 078 protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) { 079 boolean result = currentCacheSize > maxCacheSize; 080 if (result) { 081 if (eldest.getValue() instanceof Message) { 082 currentCacheSize -= ((Message)eldest.getValue()).getSize(); 083 } else if (eldest.getValue() instanceof MessagePull) { 084 currentCacheSize -= MESSAGE_PULL_SIZE; 085 } 086 if (LOG.isTraceEnabled()) { 087 LOG.trace("removing tracked message: " + eldest.getKey()); 088 } 089 } 090 return result; 091 } 092 }; 093 094 private class RemoveTransactionAction implements ResponseHandler { 095 private final TransactionInfo info; 096 097 public RemoveTransactionAction(TransactionInfo info) { 098 this.info = info; 099 } 100 101 @Override 102 public void onResponse(Command response) { 103 ConnectionId connectionId = info.getConnectionId(); 104 ConnectionState cs = connectionStates.get(connectionId); 105 if (cs != null) { 106 cs.removeTransactionState(info.getTransactionId()); 107 } 108 } 109 } 110 111 private class PrepareReadonlyTransactionAction extends RemoveTransactionAction { 112 public PrepareReadonlyTransactionAction(TransactionInfo info) { 113 super(info); 114 } 115 116 @Override 117 public void onResponse(Command command) { 118 if (command instanceof IntegerResponse) { 119 IntegerResponse response = (IntegerResponse) command; 120 if (XAResource.XA_RDONLY == response.getResult()) { 121 // all done, no commit or rollback from TM 122 super.onResponse(command); 123 } 124 } 125 } 126 } 127 128 /** 129 * Entry point for all tracked commands in the tracker. Commands should be tracked before 130 * there is an attempt to send them on the wire. Upon a successful send of a command it is 131 * necessary to call the trackBack method to complete the tracking of the given command. 132 * 133 * @param command 134 * The command that is to be tracked by this tracker. 135 * 136 * @return null if the command is not state tracked. 137 * 138 * @throws IOException if an error occurs during setup of the tracking operation. 139 */ 140 public Tracked track(Command command) throws IOException { 141 try { 142 return (Tracked)command.visit(this); 143 } catch (IOException e) { 144 throw e; 145 } catch (Throwable e) { 146 throw IOExceptionSupport.create(e); 147 } 148 } 149 150 /** 151 * Completes the two phase tracking operation for a command that is sent on the wire. Once 152 * the command is sent successfully to complete the tracking operation or otherwise update 153 * the state of the tracker. 154 * 155 * @param command 156 * The command that was previously provided to the track method. 157 */ 158 public void trackBack(Command command) { 159 if (command != null) { 160 if (trackMessages && command.isMessage()) { 161 Message message = (Message) command; 162 if (message.getTransactionId()==null) { 163 currentCacheSize = currentCacheSize + message.getSize(); 164 } 165 } else if (command instanceof MessagePull) { 166 // We only track one MessagePull per consumer so only add to cache size 167 // when the command has been marked as tracked. 168 if (((MessagePull)command).isTracked()) { 169 // just needs to be a rough estimate of size, ~4 identifiers 170 currentCacheSize += MESSAGE_PULL_SIZE; 171 } 172 } 173 } 174 } 175 176 public void restore(Transport transport) throws IOException { 177 // Restore the connections. 178 for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) { 179 ConnectionState connectionState = iter.next(); 180 connectionState.getInfo().setFailoverReconnect(true); 181 if (LOG.isDebugEnabled()) { 182 LOG.debug("conn: " + connectionState.getInfo().getConnectionId()); 183 } 184 transport.oneway(connectionState.getInfo()); 185 restoreTempDestinations(transport, connectionState); 186 187 if (restoreSessions) { 188 restoreSessions(transport, connectionState); 189 } 190 191 if (restoreTransaction) { 192 restoreTransactions(transport, connectionState); 193 } 194 } 195 196 // now flush messages and MessagePull commands. 197 for (Command msg : messageCache.values()) { 198 if (LOG.isDebugEnabled()) { 199 LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg)); 200 } 201 transport.oneway(msg); 202 } 203 } 204 205 private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException { 206 Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>(); 207 for (TransactionState transactionState : connectionState.getTransactionStates()) { 208 if (LOG.isDebugEnabled()) { 209 LOG.debug("tx: " + transactionState.getId()); 210 } 211 212 // rollback any completed transactions - no way to know if commit got there 213 // or if reply went missing 214 // 215 if (!transactionState.getCommands().isEmpty()) { 216 Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1); 217 if (lastCommand instanceof TransactionInfo) { 218 TransactionInfo transactionInfo = (TransactionInfo) lastCommand; 219 if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) { 220 if (LOG.isDebugEnabled()) { 221 LOG.debug("rolling back potentially completed tx: " + transactionState.getId()); 222 } 223 toRollback.add(transactionInfo); 224 continue; 225 } 226 } 227 } 228 229 // replay short lived producers that may have been involved in the transaction 230 for (ProducerState producerState : transactionState.getProducerStates().values()) { 231 if (LOG.isDebugEnabled()) { 232 LOG.debug("tx replay producer :" + producerState.getInfo()); 233 } 234 transport.oneway(producerState.getInfo()); 235 } 236 237 for (Command command : transactionState.getCommands()) { 238 if (LOG.isDebugEnabled()) { 239 LOG.debug("tx replay: " + command); 240 } 241 transport.oneway(command); 242 } 243 244 for (ProducerState producerState : transactionState.getProducerStates().values()) { 245 if (LOG.isDebugEnabled()) { 246 LOG.debug("tx remove replayed producer :" + producerState.getInfo()); 247 } 248 transport.oneway(producerState.getInfo().createRemoveCommand()); 249 } 250 } 251 252 for (TransactionInfo command: toRollback) { 253 // respond to the outstanding commit 254 ExceptionResponse response = new ExceptionResponse(); 255 response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId())); 256 response.setCorrelationId(command.getCommandId()); 257 transport.getTransportListener().onCommand(response); 258 } 259 } 260 261 /** 262 * @param transport 263 * @param connectionState 264 * @throws IOException 265 */ 266 protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException { 267 // Restore the connection's sessions 268 for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) { 269 SessionState sessionState = (SessionState)iter2.next(); 270 if (LOG.isDebugEnabled()) { 271 LOG.debug("session: " + sessionState.getInfo().getSessionId()); 272 } 273 transport.oneway(sessionState.getInfo()); 274 275 if (restoreProducers) { 276 restoreProducers(transport, sessionState); 277 } 278 279 if (restoreConsumers) { 280 restoreConsumers(transport, sessionState); 281 } 282 } 283 } 284 285 /** 286 * @param transport 287 * @param sessionState 288 * @throws IOException 289 */ 290 protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException { 291 // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete 292 final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId()); 293 final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete(); 294 for (ConsumerState consumerState : sessionState.getConsumerStates()) { 295 ConsumerInfo infoToSend = consumerState.getInfo(); 296 if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) { 297 infoToSend = consumerState.getInfo().copy(); 298 connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo()); 299 infoToSend.setPrefetchSize(0); 300 if (LOG.isDebugEnabled()) { 301 LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize()); 302 } 303 } 304 if (LOG.isDebugEnabled()) { 305 LOG.debug("consumer: " + infoToSend.getConsumerId()); 306 } 307 transport.oneway(infoToSend); 308 } 309 } 310 311 /** 312 * @param transport 313 * @param sessionState 314 * @throws IOException 315 */ 316 protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException { 317 // Restore the session's producers 318 for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) { 319 ProducerState producerState = (ProducerState)iter3.next(); 320 if (LOG.isDebugEnabled()) { 321 LOG.debug("producer: " + producerState.getInfo().getProducerId()); 322 } 323 transport.oneway(producerState.getInfo()); 324 } 325 } 326 327 /** 328 * @param transport 329 * @param connectionState 330 * @throws IOException 331 */ 332 protected void restoreTempDestinations(Transport transport, ConnectionState connectionState) 333 throws IOException { 334 // Restore the connection's temp destinations. 335 for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) { 336 DestinationInfo info = (DestinationInfo)iter2.next(); 337 transport.oneway(info); 338 if (LOG.isDebugEnabled()) { 339 LOG.debug("tempDest: " + info.getDestination()); 340 } 341 } 342 } 343 344 @Override 345 public Response processAddDestination(DestinationInfo info) { 346 if (info != null) { 347 ConnectionState cs = connectionStates.get(info.getConnectionId()); 348 if (cs != null && info.getDestination().isTemporary()) { 349 cs.addTempDestination(info); 350 } 351 } 352 return TRACKED_RESPONSE_MARKER; 353 } 354 355 @Override 356 public Response processRemoveDestination(DestinationInfo info) { 357 if (info != null) { 358 ConnectionState cs = connectionStates.get(info.getConnectionId()); 359 if (cs != null && info.getDestination().isTemporary()) { 360 cs.removeTempDestination(info.getDestination()); 361 } 362 } 363 return TRACKED_RESPONSE_MARKER; 364 } 365 366 @Override 367 public Response processAddProducer(ProducerInfo info) { 368 if (info != null && info.getProducerId() != null) { 369 SessionId sessionId = info.getProducerId().getParentId(); 370 if (sessionId != null) { 371 ConnectionId connectionId = sessionId.getParentId(); 372 if (connectionId != null) { 373 ConnectionState cs = connectionStates.get(connectionId); 374 if (cs != null) { 375 SessionState ss = cs.getSessionState(sessionId); 376 if (ss != null) { 377 ss.addProducer(info); 378 } 379 } 380 } 381 } 382 } 383 return TRACKED_RESPONSE_MARKER; 384 } 385 386 @Override 387 public Response processRemoveProducer(ProducerId id) { 388 if (id != null) { 389 SessionId sessionId = id.getParentId(); 390 if (sessionId != null) { 391 ConnectionId connectionId = sessionId.getParentId(); 392 if (connectionId != null) { 393 ConnectionState cs = connectionStates.get(connectionId); 394 if (cs != null) { 395 SessionState ss = cs.getSessionState(sessionId); 396 if (ss != null) { 397 ss.removeProducer(id); 398 } 399 } 400 } 401 } 402 } 403 return TRACKED_RESPONSE_MARKER; 404 } 405 406 @Override 407 public Response processAddConsumer(ConsumerInfo info) { 408 if (info != null) { 409 SessionId sessionId = info.getConsumerId().getParentId(); 410 if (sessionId != null) { 411 ConnectionId connectionId = sessionId.getParentId(); 412 if (connectionId != null) { 413 ConnectionState cs = connectionStates.get(connectionId); 414 if (cs != null) { 415 SessionState ss = cs.getSessionState(sessionId); 416 if (ss != null) { 417 ss.addConsumer(info); 418 } 419 } 420 } 421 } 422 } 423 return TRACKED_RESPONSE_MARKER; 424 } 425 426 @Override 427 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) { 428 if (id != null) { 429 SessionId sessionId = id.getParentId(); 430 if (sessionId != null) { 431 ConnectionId connectionId = sessionId.getParentId(); 432 if (connectionId != null) { 433 ConnectionState cs = connectionStates.get(connectionId); 434 if (cs != null) { 435 SessionState ss = cs.getSessionState(sessionId); 436 if (ss != null) { 437 ss.removeConsumer(id); 438 } 439 cs.getRecoveringPullConsumers().remove(id); 440 } 441 } 442 } 443 } 444 return TRACKED_RESPONSE_MARKER; 445 } 446 447 @Override 448 public Response processAddSession(SessionInfo info) { 449 if (info != null) { 450 ConnectionId connectionId = info.getSessionId().getParentId(); 451 if (connectionId != null) { 452 ConnectionState cs = connectionStates.get(connectionId); 453 if (cs != null) { 454 cs.addSession(info); 455 } 456 } 457 } 458 return TRACKED_RESPONSE_MARKER; 459 } 460 461 @Override 462 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) { 463 if (id != null) { 464 ConnectionId connectionId = id.getParentId(); 465 if (connectionId != null) { 466 ConnectionState cs = connectionStates.get(connectionId); 467 if (cs != null) { 468 cs.removeSession(id); 469 } 470 } 471 } 472 return TRACKED_RESPONSE_MARKER; 473 } 474 475 @Override 476 public Response processAddConnection(ConnectionInfo info) { 477 if (info != null) { 478 connectionStates.put(info.getConnectionId(), new ConnectionState(info)); 479 } 480 return TRACKED_RESPONSE_MARKER; 481 } 482 483 @Override 484 public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception { 485 if (id != null) { 486 connectionStates.remove(id); 487 } 488 return TRACKED_RESPONSE_MARKER; 489 } 490 491 @Override 492 public Response processMessage(Message send) throws Exception { 493 if (send != null) { 494 if (trackTransactions && send.getTransactionId() != null) { 495 ProducerId producerId = send.getProducerId(); 496 ConnectionId connectionId = producerId.getParentId().getParentId(); 497 if (connectionId != null) { 498 ConnectionState cs = connectionStates.get(connectionId); 499 if (cs != null) { 500 TransactionState transactionState = cs.getTransactionState(send.getTransactionId()); 501 if (transactionState != null) { 502 transactionState.addCommand(send); 503 504 if (trackTransactionProducers) { 505 // for jmstemplate, track the producer in case it is closed before commit 506 // and needs to be replayed 507 SessionState ss = cs.getSessionState(producerId.getParentId()); 508 ProducerState producerState = ss.getProducerState(producerId); 509 producerState.setTransactionState(transactionState); 510 } 511 } 512 } 513 } 514 return TRACKED_RESPONSE_MARKER; 515 }else if (trackMessages) { 516 messageCache.put(send.getMessageId(), send); 517 } 518 } 519 return null; 520 } 521 522 @Override 523 public Response processBeginTransaction(TransactionInfo info) { 524 if (trackTransactions && info != null && info.getTransactionId() != null) { 525 ConnectionId connectionId = info.getConnectionId(); 526 if (connectionId != null) { 527 ConnectionState cs = connectionStates.get(connectionId); 528 if (cs != null) { 529 cs.addTransactionState(info.getTransactionId()); 530 TransactionState state = cs.getTransactionState(info.getTransactionId()); 531 state.addCommand(info); 532 } 533 } 534 return TRACKED_RESPONSE_MARKER; 535 } 536 return null; 537 } 538 539 @Override 540 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 541 if (trackTransactions && info != null && info.getTransactionId() != null) { 542 ConnectionId connectionId = info.getConnectionId(); 543 if (connectionId != null) { 544 ConnectionState cs = connectionStates.get(connectionId); 545 if (cs != null) { 546 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 547 if (transactionState != null) { 548 transactionState.addCommand(info); 549 return new Tracked(new PrepareReadonlyTransactionAction(info)); 550 } 551 } 552 } 553 } 554 return null; 555 } 556 557 @Override 558 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 559 if (trackTransactions && info != null && info.getTransactionId() != null) { 560 ConnectionId connectionId = info.getConnectionId(); 561 if (connectionId != null) { 562 ConnectionState cs = connectionStates.get(connectionId); 563 if (cs != null) { 564 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 565 if (transactionState != null) { 566 transactionState.addCommand(info); 567 return new Tracked(new RemoveTransactionAction(info)); 568 } 569 } 570 } 571 } 572 return null; 573 } 574 575 @Override 576 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 577 if (trackTransactions && info != null && info.getTransactionId() != null) { 578 ConnectionId connectionId = info.getConnectionId(); 579 if (connectionId != null) { 580 ConnectionState cs = connectionStates.get(connectionId); 581 if (cs != null) { 582 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 583 if (transactionState != null) { 584 transactionState.addCommand(info); 585 return new Tracked(new RemoveTransactionAction(info)); 586 } 587 } 588 } 589 } 590 return null; 591 } 592 593 @Override 594 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 595 if (trackTransactions && info != null && info.getTransactionId() != null) { 596 ConnectionId connectionId = info.getConnectionId(); 597 if (connectionId != null) { 598 ConnectionState cs = connectionStates.get(connectionId); 599 if (cs != null) { 600 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 601 if (transactionState != null) { 602 transactionState.addCommand(info); 603 return new Tracked(new RemoveTransactionAction(info)); 604 } 605 } 606 } 607 } 608 return null; 609 } 610 611 @Override 612 public Response processEndTransaction(TransactionInfo info) throws Exception { 613 if (trackTransactions && info != null && info.getTransactionId() != null) { 614 ConnectionId connectionId = info.getConnectionId(); 615 if (connectionId != null) { 616 ConnectionState cs = connectionStates.get(connectionId); 617 if (cs != null) { 618 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 619 if (transactionState != null) { 620 transactionState.addCommand(info); 621 } 622 } 623 } 624 return TRACKED_RESPONSE_MARKER; 625 } 626 return null; 627 } 628 629 @Override 630 public Response processMessagePull(MessagePull pull) throws Exception { 631 if (pull != null) { 632 // leave a single instance in the cache 633 final String id = pull.getDestination() + "::" + pull.getConsumerId(); 634 if (messageCache.put(id.intern(), pull) == null) { 635 // Only marked as tracked if this is the first request we've seen. 636 pull.setTracked(true); 637 } 638 } 639 return null; 640 } 641 642 public boolean isRestoreConsumers() { 643 return restoreConsumers; 644 } 645 646 public void setRestoreConsumers(boolean restoreConsumers) { 647 this.restoreConsumers = restoreConsumers; 648 } 649 650 public boolean isRestoreProducers() { 651 return restoreProducers; 652 } 653 654 public void setRestoreProducers(boolean restoreProducers) { 655 this.restoreProducers = restoreProducers; 656 } 657 658 public boolean isRestoreSessions() { 659 return restoreSessions; 660 } 661 662 public void setRestoreSessions(boolean restoreSessions) { 663 this.restoreSessions = restoreSessions; 664 } 665 666 public boolean isTrackTransactions() { 667 return trackTransactions; 668 } 669 670 public void setTrackTransactions(boolean trackTransactions) { 671 this.trackTransactions = trackTransactions; 672 } 673 674 public boolean isTrackTransactionProducers() { 675 return this.trackTransactionProducers; 676 } 677 678 public void setTrackTransactionProducers(boolean trackTransactionProducers) { 679 this.trackTransactionProducers = trackTransactionProducers; 680 } 681 682 public boolean isRestoreTransaction() { 683 return restoreTransaction; 684 } 685 686 public void setRestoreTransaction(boolean restoreTransaction) { 687 this.restoreTransaction = restoreTransaction; 688 } 689 690 public boolean isTrackMessages() { 691 return trackMessages; 692 } 693 694 public void setTrackMessages(boolean trackMessages) { 695 this.trackMessages = trackMessages; 696 } 697 698 public int getMaxCacheSize() { 699 return maxCacheSize; 700 } 701 702 public void setMaxCacheSize(int maxCacheSize) { 703 this.maxCacheSize = maxCacheSize; 704 } 705 706 /** 707 * @return the current cache size for the Message and MessagePull Command cache. 708 */ 709 public long getCurrentCacheSize() { 710 return this.currentCacheSize; 711 } 712 713 public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) { 714 ConnectionState connectionState = connectionStates.get(connectionId); 715 if (connectionState != null) { 716 connectionState.setConnectionInterruptProcessingComplete(true); 717 Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers(); 718 for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) { 719 ConsumerControl control = new ConsumerControl(); 720 control.setConsumerId(entry.getKey()); 721 control.setPrefetch(entry.getValue().getPrefetchSize()); 722 control.setDestination(entry.getValue().getDestination()); 723 try { 724 if (LOG.isDebugEnabled()) { 725 LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch()); 726 } 727 transport.oneway(control); 728 } catch (Exception ex) { 729 if (LOG.isDebugEnabled()) { 730 LOG.debug("Failed to submit control for consumer: " + control.getConsumerId() 731 + " with: " + control.getPrefetch(), ex); 732 } 733 } 734 } 735 stalledConsumers.clear(); 736 } 737 } 738 739 public void transportInterrupted(ConnectionId connectionId) { 740 ConnectionState connectionState = connectionStates.get(connectionId); 741 if (connectionState != null) { 742 connectionState.setConnectionInterruptProcessingComplete(false); 743 } 744 } 745}