001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.state; 018 019import java.io.IOException; 020import java.util.Iterator; 021import java.util.LinkedHashMap; 022import java.util.Map; 023import java.util.Map.Entry; 024import java.util.Vector; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027 028import javax.jms.TransactionRolledBackException; 029import javax.transaction.xa.XAResource; 030 031import org.apache.activemq.command.Command; 032import org.apache.activemq.command.ConnectionId; 033import org.apache.activemq.command.ConnectionInfo; 034import org.apache.activemq.command.ConsumerControl; 035import org.apache.activemq.command.ConsumerId; 036import org.apache.activemq.command.ConsumerInfo; 037import org.apache.activemq.command.DestinationInfo; 038import org.apache.activemq.command.ExceptionResponse; 039import org.apache.activemq.command.IntegerResponse; 040import org.apache.activemq.command.Message; 041import org.apache.activemq.command.MessagePull; 042import org.apache.activemq.command.ProducerId; 043import org.apache.activemq.command.ProducerInfo; 044import org.apache.activemq.command.Response; 045import org.apache.activemq.command.SessionId; 046import org.apache.activemq.command.SessionInfo; 047import org.apache.activemq.command.TransactionInfo; 048import org.apache.activemq.transport.Transport; 049import org.apache.activemq.util.IOExceptionSupport; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * Tracks the state of a connection so a newly established transport can be 055 * re-initialized to the state that was tracked. 056 * 057 * 058 */ 059public class ConnectionStateTracker extends CommandVisitorAdapter { 060 private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class); 061 062 private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); 063 private static final int MESSAGE_PULL_SIZE = 400; 064 protected final ConcurrentMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<>(); 065 066 private boolean trackTransactions; 067 private boolean restoreSessions = true; 068 private boolean restoreConsumers = true; 069 private boolean restoreProducers = true; 070 private boolean restoreTransaction = true; 071 private boolean trackMessages = true; 072 private boolean trackTransactionProducers = true; 073 private int maxCacheSize = 128 * 1024; 074 private long currentCacheSize; // use long to prevent overflow for folks who set high max. 075 076 private final Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){ 077 @Override 078 protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) { 079 boolean result = currentCacheSize > maxCacheSize; 080 if (result) { 081 if (eldest.getValue() instanceof Message) { 082 currentCacheSize -= ((Message)eldest.getValue()).getSize(); 083 } else if (eldest.getValue() instanceof MessagePull) { 084 currentCacheSize -= MESSAGE_PULL_SIZE; 085 } 086 if (LOG.isTraceEnabled()) { 087 LOG.trace("removing tracked message: " + eldest.getKey()); 088 } 089 } 090 return result; 091 } 092 }; 093 094 private class RemoveTransactionAction implements ResponseHandler { 095 private final TransactionInfo info; 096 097 public RemoveTransactionAction(TransactionInfo info) { 098 this.info = info; 099 } 100 101 @Override 102 public void onResponse(Command response) { 103 ConnectionId connectionId = info.getConnectionId(); 104 ConnectionState cs = connectionStates.get(connectionId); 105 if (cs != null) { 106 cs.removeTransactionState(info.getTransactionId()); 107 } 108 } 109 } 110 111 private final class ExceptionResponseCheckAction implements ResponseHandler { 112 private final Command tracked; 113 114 public ExceptionResponseCheckAction(Command tracked) { 115 this.tracked = tracked; 116 } 117 118 @Override 119 public void onResponse(Command response) { 120 if (ExceptionResponse.DATA_STRUCTURE_TYPE == response.getDataStructureType()) { 121 if (tracked.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) { 122 processRemoveConsumer(((ConsumerInfo) tracked).getConsumerId(), 0l); 123 } else if (tracked.getDataStructureType() == ProducerInfo.DATA_STRUCTURE_TYPE) { 124 processRemoveProducer(((ProducerInfo) tracked).getProducerId()); 125 } 126 } 127 } 128 } 129 130 private class PrepareReadonlyTransactionAction extends RemoveTransactionAction { 131 public PrepareReadonlyTransactionAction(TransactionInfo info) { 132 super(info); 133 } 134 135 @Override 136 public void onResponse(Command command) { 137 if (command instanceof IntegerResponse) { 138 IntegerResponse response = (IntegerResponse) command; 139 if (XAResource.XA_RDONLY == response.getResult()) { 140 // all done, no commit or rollback from TM 141 super.onResponse(command); 142 } 143 } 144 } 145 } 146 147 /** 148 * Entry point for all tracked commands in the tracker. Commands should be tracked before 149 * there is an attempt to send them on the wire. Upon a successful send of a command it is 150 * necessary to call the trackBack method to complete the tracking of the given command. 151 * 152 * @param command 153 * The command that is to be tracked by this tracker. 154 * 155 * @return null if the command is not state tracked. 156 * 157 * @throws IOException if an error occurs during setup of the tracking operation. 158 */ 159 public Tracked track(Command command) throws IOException { 160 try { 161 return (Tracked)command.visit(this); 162 } catch (IOException e) { 163 throw e; 164 } catch (Throwable e) { 165 throw IOExceptionSupport.create(e); 166 } 167 } 168 169 /** 170 * Completes the two phase tracking operation for a command that is sent on the wire. Once 171 * the command is sent successfully to complete the tracking operation or otherwise update 172 * the state of the tracker. 173 * 174 * @param command 175 * The command that was previously provided to the track method. 176 */ 177 public void trackBack(Command command) { 178 if (command != null) { 179 if (trackMessages && command.isMessage()) { 180 Message message = (Message) command; 181 if (message.getTransactionId()==null) { 182 currentCacheSize = currentCacheSize + message.getSize(); 183 } 184 } else if (command instanceof MessagePull) { 185 // We only track one MessagePull per consumer so only add to cache size 186 // when the command has been marked as tracked. 187 if (((MessagePull)command).isTracked()) { 188 // just needs to be a rough estimate of size, ~4 identifiers 189 currentCacheSize += MESSAGE_PULL_SIZE; 190 } 191 } 192 } 193 } 194 195 public void restore(Transport transport) throws IOException { 196 // Restore the connections. 197 for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) { 198 ConnectionState connectionState = iter.next(); 199 connectionState.getInfo().setFailoverReconnect(true); 200 if (LOG.isDebugEnabled()) { 201 LOG.debug("conn: " + connectionState.getInfo().getConnectionId()); 202 } 203 transport.oneway(connectionState.getInfo()); 204 restoreTempDestinations(transport, connectionState); 205 206 if (restoreSessions) { 207 restoreSessions(transport, connectionState); 208 } 209 210 if (restoreTransaction) { 211 restoreTransactions(transport, connectionState); 212 } 213 } 214 215 // now flush messages and MessagePull commands. 216 for (Command msg : messageCache.values()) { 217 if (LOG.isDebugEnabled()) { 218 LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg)); 219 } 220 transport.oneway(msg); 221 } 222 } 223 224 private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException { 225 Vector<TransactionInfo> toRollback = new Vector<>(); 226 for (TransactionState transactionState : connectionState.getTransactionStates()) { 227 if (LOG.isDebugEnabled()) { 228 LOG.debug("tx: " + transactionState.getId()); 229 } 230 231 // rollback any completed transactions - no way to know if commit got there 232 // or if reply went missing 233 // 234 if (!transactionState.getCommands().isEmpty()) { 235 Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1); 236 if (lastCommand instanceof TransactionInfo) { 237 TransactionInfo transactionInfo = (TransactionInfo) lastCommand; 238 if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) { 239 if (LOG.isDebugEnabled()) { 240 LOG.debug("rolling back potentially completed tx: " + transactionState.getId()); 241 } 242 toRollback.add(transactionInfo); 243 continue; 244 } 245 } 246 } 247 248 // replay short lived producers that may have been involved in the transaction 249 for (ProducerState producerState : transactionState.getProducerStates().values()) { 250 if (LOG.isDebugEnabled()) { 251 LOG.debug("tx replay producer :" + producerState.getInfo()); 252 } 253 transport.oneway(producerState.getInfo()); 254 } 255 256 for (Command command : transactionState.getCommands()) { 257 if (LOG.isDebugEnabled()) { 258 LOG.debug("tx replay: " + command); 259 } 260 transport.oneway(command); 261 } 262 263 for (ProducerState producerState : transactionState.getProducerStates().values()) { 264 if (LOG.isDebugEnabled()) { 265 LOG.debug("tx remove replayed producer :" + producerState.getInfo()); 266 } 267 transport.oneway(producerState.getInfo().createRemoveCommand()); 268 } 269 } 270 271 for (TransactionInfo command: toRollback) { 272 // respond to the outstanding commit 273 ExceptionResponse response = new ExceptionResponse(); 274 response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId())); 275 response.setCorrelationId(command.getCommandId()); 276 transport.getTransportListener().onCommand(response); 277 } 278 } 279 280 /** 281 * @param transport 282 * @param connectionState 283 * @throws IOException 284 */ 285 protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException { 286 // Restore the connection's sessions 287 for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) { 288 SessionState sessionState = (SessionState)iter2.next(); 289 if (LOG.isDebugEnabled()) { 290 LOG.debug("session: " + sessionState.getInfo().getSessionId()); 291 } 292 transport.oneway(sessionState.getInfo()); 293 294 if (restoreProducers) { 295 restoreProducers(transport, sessionState); 296 } 297 298 if (restoreConsumers) { 299 restoreConsumers(transport, sessionState); 300 } 301 } 302 } 303 304 /** 305 * @param transport 306 * @param sessionState 307 * @throws IOException 308 */ 309 protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException { 310 // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete 311 final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId()); 312 final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete(); 313 for (ConsumerState consumerState : sessionState.getConsumerStates()) { 314 ConsumerInfo infoToSend = consumerState.getInfo(); 315 if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) { 316 infoToSend = consumerState.getInfo().copy(); 317 connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo()); 318 infoToSend.setPrefetchSize(0); 319 if (LOG.isDebugEnabled()) { 320 LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize()); 321 } 322 } 323 if (LOG.isDebugEnabled()) { 324 LOG.debug("consumer: " + infoToSend.getConsumerId()); 325 } 326 transport.oneway(infoToSend); 327 } 328 } 329 330 /** 331 * @param transport 332 * @param sessionState 333 * @throws IOException 334 */ 335 protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException { 336 // Restore the session's producers 337 for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) { 338 ProducerState producerState = (ProducerState)iter3.next(); 339 if (LOG.isDebugEnabled()) { 340 LOG.debug("producer: " + producerState.getInfo().getProducerId()); 341 } 342 transport.oneway(producerState.getInfo()); 343 } 344 } 345 346 /** 347 * @param transport 348 * @param connectionState 349 * @throws IOException 350 */ 351 protected void restoreTempDestinations(Transport transport, ConnectionState connectionState) 352 throws IOException { 353 // Restore the connection's temp destinations. 354 for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) { 355 DestinationInfo info = (DestinationInfo)iter2.next(); 356 transport.oneway(info); 357 if (LOG.isDebugEnabled()) { 358 LOG.debug("tempDest: " + info.getDestination()); 359 } 360 } 361 } 362 363 @Override 364 public Response processAddDestination(DestinationInfo info) { 365 if (info != null) { 366 ConnectionState cs = connectionStates.get(info.getConnectionId()); 367 if (cs != null && info.getDestination().isTemporary()) { 368 cs.addTempDestination(info); 369 } 370 } 371 return TRACKED_RESPONSE_MARKER; 372 } 373 374 @Override 375 public Response processRemoveDestination(DestinationInfo info) { 376 if (info != null) { 377 ConnectionState cs = connectionStates.get(info.getConnectionId()); 378 if (cs != null && info.getDestination().isTemporary()) { 379 cs.removeTempDestination(info.getDestination()); 380 } 381 } 382 return TRACKED_RESPONSE_MARKER; 383 } 384 385 @Override 386 public Response processAddProducer(ProducerInfo info) { 387 if (info != null && info.getProducerId() != null) { 388 SessionId sessionId = info.getProducerId().getParentId(); 389 if (sessionId != null) { 390 ConnectionId connectionId = sessionId.getParentId(); 391 if (connectionId != null) { 392 ConnectionState cs = connectionStates.get(connectionId); 393 if (cs != null) { 394 SessionState ss = cs.getSessionState(sessionId); 395 if (ss != null) { 396 ss.addProducer(info); 397 if (info.isResponseRequired()) { 398 return new Tracked(new ExceptionResponseCheckAction(info)); 399 } 400 } 401 } 402 } 403 } 404 } 405 return TRACKED_RESPONSE_MARKER; 406 } 407 408 @Override 409 public Response processRemoveProducer(ProducerId id) { 410 if (id != null) { 411 SessionId sessionId = id.getParentId(); 412 if (sessionId != null) { 413 ConnectionId connectionId = sessionId.getParentId(); 414 if (connectionId != null) { 415 ConnectionState cs = connectionStates.get(connectionId); 416 if (cs != null) { 417 SessionState ss = cs.getSessionState(sessionId); 418 if (ss != null) { 419 ss.removeProducer(id); 420 } 421 } 422 } 423 } 424 } 425 return TRACKED_RESPONSE_MARKER; 426 } 427 428 @Override 429 public Response processAddConsumer(ConsumerInfo info) { 430 if (info != null) { 431 SessionId sessionId = info.getConsumerId().getParentId(); 432 if (sessionId != null) { 433 ConnectionId connectionId = sessionId.getParentId(); 434 if (connectionId != null) { 435 ConnectionState cs = connectionStates.get(connectionId); 436 if (cs != null) { 437 SessionState ss = cs.getSessionState(sessionId); 438 if (ss != null) { 439 ss.addConsumer(info); 440 if (info.isResponseRequired()) { 441 return new Tracked(new ExceptionResponseCheckAction(info)); 442 } 443 } 444 } 445 } 446 } 447 } 448 return TRACKED_RESPONSE_MARKER; 449 } 450 451 @Override 452 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) { 453 if (id != null) { 454 SessionId sessionId = id.getParentId(); 455 if (sessionId != null) { 456 ConnectionId connectionId = sessionId.getParentId(); 457 if (connectionId != null) { 458 ConnectionState cs = connectionStates.get(connectionId); 459 if (cs != null) { 460 SessionState ss = cs.getSessionState(sessionId); 461 if (ss != null) { 462 ss.removeConsumer(id); 463 } 464 cs.getRecoveringPullConsumers().remove(id); 465 } 466 } 467 } 468 } 469 return TRACKED_RESPONSE_MARKER; 470 } 471 472 @Override 473 public Response processAddSession(SessionInfo info) { 474 if (info != null) { 475 ConnectionId connectionId = info.getSessionId().getParentId(); 476 if (connectionId != null) { 477 ConnectionState cs = connectionStates.get(connectionId); 478 if (cs != null) { 479 cs.addSession(info); 480 } 481 } 482 } 483 return TRACKED_RESPONSE_MARKER; 484 } 485 486 @Override 487 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) { 488 if (id != null) { 489 ConnectionId connectionId = id.getParentId(); 490 if (connectionId != null) { 491 ConnectionState cs = connectionStates.get(connectionId); 492 if (cs != null) { 493 cs.removeSession(id); 494 } 495 } 496 } 497 return TRACKED_RESPONSE_MARKER; 498 } 499 500 @Override 501 public Response processAddConnection(ConnectionInfo info) { 502 if (info != null) { 503 connectionStates.put(info.getConnectionId(), new ConnectionState(info)); 504 } 505 return TRACKED_RESPONSE_MARKER; 506 } 507 508 @Override 509 public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception { 510 if (id != null) { 511 connectionStates.remove(id); 512 } 513 return TRACKED_RESPONSE_MARKER; 514 } 515 516 @Override 517 public Response processMessage(Message send) throws Exception { 518 if (send != null) { 519 if (trackTransactions && send.getTransactionId() != null) { 520 ProducerId producerId = send.getProducerId(); 521 ConnectionId connectionId = producerId.getParentId().getParentId(); 522 if (connectionId != null) { 523 ConnectionState cs = connectionStates.get(connectionId); 524 if (cs != null) { 525 TransactionState transactionState = cs.getTransactionState(send.getTransactionId()); 526 if (transactionState != null) { 527 transactionState.addCommand(send); 528 529 if (trackTransactionProducers) { 530 // for jmstemplate, track the producer in case it is closed before commit 531 // and needs to be replayed 532 SessionState ss = cs.getSessionState(producerId.getParentId()); 533 ProducerState producerState = ss.getProducerState(producerId); 534 producerState.setTransactionState(transactionState); 535 } 536 } 537 } 538 } 539 return TRACKED_RESPONSE_MARKER; 540 }else if (trackMessages) { 541 messageCache.put(send.getMessageId(), send); 542 } 543 } 544 return null; 545 } 546 547 @Override 548 public Response processBeginTransaction(TransactionInfo info) { 549 if (trackTransactions && info != null && info.getTransactionId() != null) { 550 ConnectionId connectionId = info.getConnectionId(); 551 if (connectionId != null) { 552 ConnectionState cs = connectionStates.get(connectionId); 553 if (cs != null) { 554 cs.addTransactionState(info.getTransactionId()); 555 TransactionState state = cs.getTransactionState(info.getTransactionId()); 556 state.addCommand(info); 557 } 558 } 559 return TRACKED_RESPONSE_MARKER; 560 } 561 return null; 562 } 563 564 @Override 565 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 566 if (trackTransactions && info != null && info.getTransactionId() != null) { 567 ConnectionId connectionId = info.getConnectionId(); 568 if (connectionId != null) { 569 ConnectionState cs = connectionStates.get(connectionId); 570 if (cs != null) { 571 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 572 if (transactionState != null) { 573 transactionState.addCommand(info); 574 return new Tracked(new PrepareReadonlyTransactionAction(info)); 575 } 576 } 577 } 578 } 579 return null; 580 } 581 582 @Override 583 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 584 if (trackTransactions && info != null && info.getTransactionId() != null) { 585 ConnectionId connectionId = info.getConnectionId(); 586 if (connectionId != null) { 587 ConnectionState cs = connectionStates.get(connectionId); 588 if (cs != null) { 589 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 590 if (transactionState != null) { 591 transactionState.addCommand(info); 592 return new Tracked(new RemoveTransactionAction(info)); 593 } 594 } 595 } 596 } 597 return null; 598 } 599 600 @Override 601 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 602 if (trackTransactions && info != null && info.getTransactionId() != null) { 603 ConnectionId connectionId = info.getConnectionId(); 604 if (connectionId != null) { 605 ConnectionState cs = connectionStates.get(connectionId); 606 if (cs != null) { 607 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 608 if (transactionState != null) { 609 transactionState.addCommand(info); 610 return new Tracked(new RemoveTransactionAction(info)); 611 } 612 } 613 } 614 } 615 return null; 616 } 617 618 @Override 619 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 620 if (trackTransactions && info != null && info.getTransactionId() != null) { 621 ConnectionId connectionId = info.getConnectionId(); 622 if (connectionId != null) { 623 ConnectionState cs = connectionStates.get(connectionId); 624 if (cs != null) { 625 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 626 if (transactionState != null) { 627 transactionState.addCommand(info); 628 return new Tracked(new RemoveTransactionAction(info)); 629 } 630 } 631 } 632 } 633 return null; 634 } 635 636 @Override 637 public Response processEndTransaction(TransactionInfo info) throws Exception { 638 if (trackTransactions && info != null && info.getTransactionId() != null) { 639 ConnectionId connectionId = info.getConnectionId(); 640 if (connectionId != null) { 641 ConnectionState cs = connectionStates.get(connectionId); 642 if (cs != null) { 643 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 644 if (transactionState != null) { 645 transactionState.addCommand(info); 646 } 647 } 648 } 649 return TRACKED_RESPONSE_MARKER; 650 } 651 return null; 652 } 653 654 @Override 655 public Response processMessagePull(MessagePull pull) throws Exception { 656 if (pull != null) { 657 // leave a single instance in the cache 658 final String id = pull.getDestination() + "::" + pull.getConsumerId(); 659 if (messageCache.put(id.intern(), pull) == null) { 660 // Only marked as tracked if this is the first request we've seen. 661 pull.setTracked(true); 662 } 663 } 664 return null; 665 } 666 667 public boolean isRestoreConsumers() { 668 return restoreConsumers; 669 } 670 671 public void setRestoreConsumers(boolean restoreConsumers) { 672 this.restoreConsumers = restoreConsumers; 673 } 674 675 public boolean isRestoreProducers() { 676 return restoreProducers; 677 } 678 679 public void setRestoreProducers(boolean restoreProducers) { 680 this.restoreProducers = restoreProducers; 681 } 682 683 public boolean isRestoreSessions() { 684 return restoreSessions; 685 } 686 687 public void setRestoreSessions(boolean restoreSessions) { 688 this.restoreSessions = restoreSessions; 689 } 690 691 public boolean isTrackTransactions() { 692 return trackTransactions; 693 } 694 695 public void setTrackTransactions(boolean trackTransactions) { 696 this.trackTransactions = trackTransactions; 697 } 698 699 public boolean isTrackTransactionProducers() { 700 return this.trackTransactionProducers; 701 } 702 703 public void setTrackTransactionProducers(boolean trackTransactionProducers) { 704 this.trackTransactionProducers = trackTransactionProducers; 705 } 706 707 public boolean isRestoreTransaction() { 708 return restoreTransaction; 709 } 710 711 public void setRestoreTransaction(boolean restoreTransaction) { 712 this.restoreTransaction = restoreTransaction; 713 } 714 715 public boolean isTrackMessages() { 716 return trackMessages; 717 } 718 719 public void setTrackMessages(boolean trackMessages) { 720 this.trackMessages = trackMessages; 721 } 722 723 public int getMaxCacheSize() { 724 return maxCacheSize; 725 } 726 727 public void setMaxCacheSize(int maxCacheSize) { 728 this.maxCacheSize = maxCacheSize; 729 } 730 731 /** 732 * @return the current cache size for the Message and MessagePull Command cache. 733 */ 734 public long getCurrentCacheSize() { 735 return this.currentCacheSize; 736 } 737 738 public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) { 739 ConnectionState connectionState = connectionStates.get(connectionId); 740 if (connectionState != null) { 741 connectionState.setConnectionInterruptProcessingComplete(true); 742 Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers(); 743 for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) { 744 ConsumerControl control = new ConsumerControl(); 745 control.setConsumerId(entry.getKey()); 746 control.setPrefetch(entry.getValue().getPrefetchSize()); 747 control.setDestination(entry.getValue().getDestination()); 748 try { 749 if (LOG.isDebugEnabled()) { 750 LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch()); 751 } 752 transport.oneway(control); 753 } catch (Exception ex) { 754 if (LOG.isDebugEnabled()) { 755 LOG.debug("Failed to submit control for consumer: " + control.getConsumerId() 756 + " with: " + control.getPrefetch(), ex); 757 } 758 } 759 } 760 stalledConsumers.clear(); 761 } 762 } 763 764 public void transportInterrupted(ConnectionId connectionId) { 765 ConnectionState connectionState = connectionStates.get(connectionId); 766 if (connectionState != null) { 767 connectionState.setConnectionInterruptProcessingComplete(false); 768 } 769 } 770}