001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.state;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.LinkedHashMap;
022import java.util.Map;
023import java.util.Map.Entry;
024import java.util.Vector;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027
028import javax.jms.TransactionRolledBackException;
029import javax.transaction.xa.XAResource;
030
031import org.apache.activemq.command.Command;
032import org.apache.activemq.command.ConnectionId;
033import org.apache.activemq.command.ConnectionInfo;
034import org.apache.activemq.command.ConsumerControl;
035import org.apache.activemq.command.ConsumerId;
036import org.apache.activemq.command.ConsumerInfo;
037import org.apache.activemq.command.DestinationInfo;
038import org.apache.activemq.command.ExceptionResponse;
039import org.apache.activemq.command.IntegerResponse;
040import org.apache.activemq.command.Message;
041import org.apache.activemq.command.MessagePull;
042import org.apache.activemq.command.ProducerId;
043import org.apache.activemq.command.ProducerInfo;
044import org.apache.activemq.command.Response;
045import org.apache.activemq.command.SessionId;
046import org.apache.activemq.command.SessionInfo;
047import org.apache.activemq.command.TransactionInfo;
048import org.apache.activemq.transport.Transport;
049import org.apache.activemq.util.IOExceptionSupport;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Tracks the state of a connection so a newly established transport can be
055 * re-initialized to the state that was tracked.
056 *
057 *
058 */
059public class ConnectionStateTracker extends CommandVisitorAdapter {
060    private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class);
061
062    private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
063    private static final int MESSAGE_PULL_SIZE = 400;
064    protected final ConcurrentMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
065
066    private boolean trackTransactions;
067    private boolean restoreSessions = true;
068    private boolean restoreConsumers = true;
069    private boolean restoreProducers = true;
070    private boolean restoreTransaction = true;
071    private boolean trackMessages = true;
072    private boolean trackTransactionProducers = true;
073    private int maxCacheSize = 128 * 1024;
074    private long currentCacheSize;  // use long to prevent overflow for folks who set high max.
075
076    private final Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
077        @Override
078        protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
079            boolean result = currentCacheSize > maxCacheSize;
080            if (result) {
081                if (eldest.getValue() instanceof Message) {
082                    currentCacheSize -= ((Message)eldest.getValue()).getSize();
083                } else if (eldest.getValue() instanceof MessagePull) {
084                    currentCacheSize -= MESSAGE_PULL_SIZE;
085                }
086                if (LOG.isTraceEnabled()) {
087                    LOG.trace("removing tracked message: " + eldest.getKey());
088                }
089            }
090            return result;
091        }
092    };
093
094    private class RemoveTransactionAction implements ResponseHandler {
095        private final TransactionInfo info;
096
097        public RemoveTransactionAction(TransactionInfo info) {
098            this.info = info;
099        }
100
101        @Override
102        public void onResponse(Command response) {
103            ConnectionId connectionId = info.getConnectionId();
104            ConnectionState cs = connectionStates.get(connectionId);
105            if (cs != null) {
106                cs.removeTransactionState(info.getTransactionId());
107            }
108        }
109    }
110
111    private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
112        public PrepareReadonlyTransactionAction(TransactionInfo info) {
113            super(info);
114        }
115
116        @Override
117        public void onResponse(Command command) {
118            if (command instanceof IntegerResponse) {
119                IntegerResponse response = (IntegerResponse) command;
120                if (XAResource.XA_RDONLY == response.getResult()) {
121                    // all done, no commit or rollback from TM
122                    super.onResponse(command);
123                }
124            }
125        }
126    }
127
128    /**
129     * Entry point for all tracked commands in the tracker.  Commands should be tracked before
130     * there is an attempt to send them on the wire.  Upon a successful send of a command it is
131     * necessary to call the trackBack method to complete the tracking of the given command.
132     *
133     * @param command
134     *      The command that is to be tracked by this tracker.
135     *
136     * @return null if the command is not state tracked.
137     *
138     * @throws IOException if an error occurs during setup of the tracking operation.
139     */
140    public Tracked track(Command command) throws IOException {
141        try {
142            return (Tracked)command.visit(this);
143        } catch (IOException e) {
144            throw e;
145        } catch (Throwable e) {
146            throw IOExceptionSupport.create(e);
147        }
148    }
149
150    /**
151     * Completes the two phase tracking operation for a command that is sent on the wire.  Once
152     * the command is sent successfully to complete the tracking operation or otherwise update
153     * the state of the tracker.
154     *
155     * @param command
156     *      The command that was previously provided to the track method.
157     */
158    public void trackBack(Command command) {
159        if (command != null) {
160            if (trackMessages && command.isMessage()) {
161                Message message = (Message) command;
162                if (message.getTransactionId()==null) {
163                    currentCacheSize = currentCacheSize +  message.getSize();
164                }
165            } else if (command instanceof MessagePull) {
166                // We only track one MessagePull per consumer so only add to cache size
167                // when the command has been marked as tracked.
168                if (((MessagePull)command).isTracked()) {
169                    // just needs to be a rough estimate of size, ~4 identifiers
170                    currentCacheSize += MESSAGE_PULL_SIZE;
171                }
172            }
173        }
174    }
175
176    public void restore(Transport transport) throws IOException {
177        // Restore the connections.
178        for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
179            ConnectionState connectionState = iter.next();
180            connectionState.getInfo().setFailoverReconnect(true);
181            if (LOG.isDebugEnabled()) {
182                LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
183            }
184            transport.oneway(connectionState.getInfo());
185            restoreTempDestinations(transport, connectionState);
186
187            if (restoreSessions) {
188                restoreSessions(transport, connectionState);
189            }
190
191            if (restoreTransaction) {
192                restoreTransactions(transport, connectionState);
193            }
194        }
195
196        // now flush messages and MessagePull commands.
197        for (Command msg : messageCache.values()) {
198            if (LOG.isDebugEnabled()) {
199                LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg));
200            }
201            transport.oneway(msg);
202        }
203    }
204
205    private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
206        Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
207        for (TransactionState transactionState : connectionState.getTransactionStates()) {
208            if (LOG.isDebugEnabled()) {
209                LOG.debug("tx: " + transactionState.getId());
210            }
211
212            // rollback any completed transactions - no way to know if commit got there
213            // or if reply went missing
214            //
215            if (!transactionState.getCommands().isEmpty()) {
216                Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
217                if (lastCommand instanceof TransactionInfo) {
218                    TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
219                    if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
220                        if (LOG.isDebugEnabled()) {
221                            LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
222                        }
223                        toRollback.add(transactionInfo);
224                        continue;
225                    }
226                }
227            }
228
229            // replay short lived producers that may have been involved in the transaction
230            for (ProducerState producerState : transactionState.getProducerStates().values()) {
231                if (LOG.isDebugEnabled()) {
232                    LOG.debug("tx replay producer :" + producerState.getInfo());
233                }
234                transport.oneway(producerState.getInfo());
235            }
236
237            for (Command command : transactionState.getCommands()) {
238                if (LOG.isDebugEnabled()) {
239                    LOG.debug("tx replay: " + command);
240                }
241                transport.oneway(command);
242            }
243
244            for (ProducerState producerState : transactionState.getProducerStates().values()) {
245                if (LOG.isDebugEnabled()) {
246                    LOG.debug("tx remove replayed producer :" + producerState.getInfo());
247                }
248                transport.oneway(producerState.getInfo().createRemoveCommand());
249            }
250        }
251
252        for (TransactionInfo command: toRollback) {
253            // respond to the outstanding commit
254            ExceptionResponse response = new ExceptionResponse();
255            response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
256            response.setCorrelationId(command.getCommandId());
257            transport.getTransportListener().onCommand(response);
258        }
259    }
260
261    /**
262     * @param transport
263     * @param connectionState
264     * @throws IOException
265     */
266    protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
267        // Restore the connection's sessions
268        for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
269            SessionState sessionState = (SessionState)iter2.next();
270            if (LOG.isDebugEnabled()) {
271                LOG.debug("session: " + sessionState.getInfo().getSessionId());
272            }
273            transport.oneway(sessionState.getInfo());
274
275            if (restoreProducers) {
276                restoreProducers(transport, sessionState);
277            }
278
279            if (restoreConsumers) {
280                restoreConsumers(transport, sessionState);
281            }
282        }
283    }
284
285    /**
286     * @param transport
287     * @param sessionState
288     * @throws IOException
289     */
290    protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
291        // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
292        final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId());
293        final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete();
294        for (ConsumerState consumerState : sessionState.getConsumerStates()) {
295            ConsumerInfo infoToSend = consumerState.getInfo();
296            if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) {
297                infoToSend = consumerState.getInfo().copy();
298                connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo());
299                infoToSend.setPrefetchSize(0);
300                if (LOG.isDebugEnabled()) {
301                    LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize());
302                }
303            }
304            if (LOG.isDebugEnabled()) {
305                LOG.debug("consumer: " + infoToSend.getConsumerId());
306            }
307            transport.oneway(infoToSend);
308        }
309    }
310
311    /**
312     * @param transport
313     * @param sessionState
314     * @throws IOException
315     */
316    protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
317        // Restore the session's producers
318        for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
319            ProducerState producerState = (ProducerState)iter3.next();
320            if (LOG.isDebugEnabled()) {
321                LOG.debug("producer: " + producerState.getInfo().getProducerId());
322            }
323            transport.oneway(producerState.getInfo());
324        }
325    }
326
327    /**
328     * @param transport
329     * @param connectionState
330     * @throws IOException
331     */
332    protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
333        throws IOException {
334        // Restore the connection's temp destinations.
335        for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) {
336            DestinationInfo info = (DestinationInfo)iter2.next();
337            transport.oneway(info);
338            if (LOG.isDebugEnabled()) {
339                LOG.debug("tempDest: " + info.getDestination());
340            }
341        }
342    }
343
344    @Override
345    public Response processAddDestination(DestinationInfo info) {
346        if (info != null) {
347            ConnectionState cs = connectionStates.get(info.getConnectionId());
348            if (cs != null && info.getDestination().isTemporary()) {
349                cs.addTempDestination(info);
350            }
351        }
352        return TRACKED_RESPONSE_MARKER;
353    }
354
355    @Override
356    public Response processRemoveDestination(DestinationInfo info) {
357        if (info != null) {
358            ConnectionState cs = connectionStates.get(info.getConnectionId());
359            if (cs != null && info.getDestination().isTemporary()) {
360                cs.removeTempDestination(info.getDestination());
361            }
362        }
363        return TRACKED_RESPONSE_MARKER;
364    }
365
366    @Override
367    public Response processAddProducer(ProducerInfo info) {
368        if (info != null && info.getProducerId() != null) {
369            SessionId sessionId = info.getProducerId().getParentId();
370            if (sessionId != null) {
371                ConnectionId connectionId = sessionId.getParentId();
372                if (connectionId != null) {
373                    ConnectionState cs = connectionStates.get(connectionId);
374                    if (cs != null) {
375                        SessionState ss = cs.getSessionState(sessionId);
376                        if (ss != null) {
377                            ss.addProducer(info);
378                        }
379                    }
380                }
381            }
382        }
383        return TRACKED_RESPONSE_MARKER;
384    }
385
386    @Override
387    public Response processRemoveProducer(ProducerId id) {
388        if (id != null) {
389            SessionId sessionId = id.getParentId();
390            if (sessionId != null) {
391                ConnectionId connectionId = sessionId.getParentId();
392                if (connectionId != null) {
393                    ConnectionState cs = connectionStates.get(connectionId);
394                    if (cs != null) {
395                        SessionState ss = cs.getSessionState(sessionId);
396                        if (ss != null) {
397                            ss.removeProducer(id);
398                        }
399                    }
400                }
401            }
402        }
403        return TRACKED_RESPONSE_MARKER;
404    }
405
406    @Override
407    public Response processAddConsumer(ConsumerInfo info) {
408        if (info != null) {
409            SessionId sessionId = info.getConsumerId().getParentId();
410            if (sessionId != null) {
411                ConnectionId connectionId = sessionId.getParentId();
412                if (connectionId != null) {
413                    ConnectionState cs = connectionStates.get(connectionId);
414                    if (cs != null) {
415                        SessionState ss = cs.getSessionState(sessionId);
416                        if (ss != null) {
417                            ss.addConsumer(info);
418                        }
419                    }
420                }
421            }
422        }
423        return TRACKED_RESPONSE_MARKER;
424    }
425
426    @Override
427    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
428        if (id != null) {
429            SessionId sessionId = id.getParentId();
430            if (sessionId != null) {
431                ConnectionId connectionId = sessionId.getParentId();
432                if (connectionId != null) {
433                    ConnectionState cs = connectionStates.get(connectionId);
434                    if (cs != null) {
435                        SessionState ss = cs.getSessionState(sessionId);
436                        if (ss != null) {
437                            ss.removeConsumer(id);
438                        }
439                        cs.getRecoveringPullConsumers().remove(id);
440                    }
441                }
442            }
443        }
444        return TRACKED_RESPONSE_MARKER;
445    }
446
447    @Override
448    public Response processAddSession(SessionInfo info) {
449        if (info != null) {
450            ConnectionId connectionId = info.getSessionId().getParentId();
451            if (connectionId != null) {
452                ConnectionState cs = connectionStates.get(connectionId);
453                if (cs != null) {
454                    cs.addSession(info);
455                }
456            }
457        }
458        return TRACKED_RESPONSE_MARKER;
459    }
460
461    @Override
462    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
463        if (id != null) {
464            ConnectionId connectionId = id.getParentId();
465            if (connectionId != null) {
466                ConnectionState cs = connectionStates.get(connectionId);
467                if (cs != null) {
468                    cs.removeSession(id);
469                }
470            }
471        }
472        return TRACKED_RESPONSE_MARKER;
473    }
474
475    @Override
476    public Response processAddConnection(ConnectionInfo info) {
477        if (info != null) {
478            connectionStates.put(info.getConnectionId(), new ConnectionState(info));
479        }
480        return TRACKED_RESPONSE_MARKER;
481    }
482
483    @Override
484    public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
485        if (id != null) {
486            connectionStates.remove(id);
487        }
488        return TRACKED_RESPONSE_MARKER;
489    }
490
491    @Override
492    public Response processMessage(Message send) throws Exception {
493        if (send != null) {
494            if (trackTransactions && send.getTransactionId() != null) {
495                ProducerId producerId = send.getProducerId();
496                ConnectionId connectionId = producerId.getParentId().getParentId();
497                if (connectionId != null) {
498                    ConnectionState cs = connectionStates.get(connectionId);
499                    if (cs != null) {
500                        TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
501                        if (transactionState != null) {
502                            transactionState.addCommand(send);
503
504                            if (trackTransactionProducers) {
505                                // for jmstemplate, track the producer in case it is closed before commit
506                                // and needs to be replayed
507                                SessionState ss = cs.getSessionState(producerId.getParentId());
508                                ProducerState producerState = ss.getProducerState(producerId);
509                                producerState.setTransactionState(transactionState);
510                            }
511                        }
512                    }
513                }
514                return TRACKED_RESPONSE_MARKER;
515            }else if (trackMessages) {
516                messageCache.put(send.getMessageId(), send);
517            }
518        }
519        return null;
520    }
521
522    @Override
523    public Response processBeginTransaction(TransactionInfo info) {
524        if (trackTransactions && info != null && info.getTransactionId() != null) {
525            ConnectionId connectionId = info.getConnectionId();
526            if (connectionId != null) {
527                ConnectionState cs = connectionStates.get(connectionId);
528                if (cs != null) {
529                    cs.addTransactionState(info.getTransactionId());
530                    TransactionState state = cs.getTransactionState(info.getTransactionId());
531                    state.addCommand(info);
532                }
533            }
534            return TRACKED_RESPONSE_MARKER;
535        }
536        return null;
537    }
538
539    @Override
540    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
541        if (trackTransactions && info != null && info.getTransactionId() != null) {
542            ConnectionId connectionId = info.getConnectionId();
543            if (connectionId != null) {
544                ConnectionState cs = connectionStates.get(connectionId);
545                if (cs != null) {
546                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
547                    if (transactionState != null) {
548                        transactionState.addCommand(info);
549                        return new Tracked(new PrepareReadonlyTransactionAction(info));
550                    }
551                }
552            }
553        }
554        return null;
555    }
556
557    @Override
558    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
559        if (trackTransactions && info != null && info.getTransactionId() != null) {
560            ConnectionId connectionId = info.getConnectionId();
561            if (connectionId != null) {
562                ConnectionState cs = connectionStates.get(connectionId);
563                if (cs != null) {
564                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
565                    if (transactionState != null) {
566                        transactionState.addCommand(info);
567                        return new Tracked(new RemoveTransactionAction(info));
568                    }
569                }
570            }
571        }
572        return null;
573    }
574
575    @Override
576    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
577        if (trackTransactions && info != null && info.getTransactionId() != null) {
578            ConnectionId connectionId = info.getConnectionId();
579            if (connectionId != null) {
580                ConnectionState cs = connectionStates.get(connectionId);
581                if (cs != null) {
582                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
583                    if (transactionState != null) {
584                        transactionState.addCommand(info);
585                        return new Tracked(new RemoveTransactionAction(info));
586                    }
587                }
588            }
589        }
590        return null;
591    }
592
593    @Override
594    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
595        if (trackTransactions && info != null && info.getTransactionId() != null) {
596            ConnectionId connectionId = info.getConnectionId();
597            if (connectionId != null) {
598                ConnectionState cs = connectionStates.get(connectionId);
599                if (cs != null) {
600                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
601                    if (transactionState != null) {
602                        transactionState.addCommand(info);
603                        return new Tracked(new RemoveTransactionAction(info));
604                    }
605                }
606            }
607        }
608        return null;
609    }
610
611    @Override
612    public Response processEndTransaction(TransactionInfo info) throws Exception {
613        if (trackTransactions && info != null && info.getTransactionId() != null) {
614            ConnectionId connectionId = info.getConnectionId();
615            if (connectionId != null) {
616                ConnectionState cs = connectionStates.get(connectionId);
617                if (cs != null) {
618                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
619                    if (transactionState != null) {
620                        transactionState.addCommand(info);
621                    }
622                }
623            }
624            return TRACKED_RESPONSE_MARKER;
625        }
626        return null;
627    }
628
629    @Override
630    public Response processMessagePull(MessagePull pull) throws Exception {
631        if (pull != null) {
632            // leave a single instance in the cache
633            final String id = pull.getDestination() + "::" + pull.getConsumerId();
634            if (messageCache.put(id.intern(), pull) == null) {
635                // Only marked as tracked if this is the first request we've seen.
636                pull.setTracked(true);
637            }
638        }
639        return null;
640    }
641
642    public boolean isRestoreConsumers() {
643        return restoreConsumers;
644    }
645
646    public void setRestoreConsumers(boolean restoreConsumers) {
647        this.restoreConsumers = restoreConsumers;
648    }
649
650    public boolean isRestoreProducers() {
651        return restoreProducers;
652    }
653
654    public void setRestoreProducers(boolean restoreProducers) {
655        this.restoreProducers = restoreProducers;
656    }
657
658    public boolean isRestoreSessions() {
659        return restoreSessions;
660    }
661
662    public void setRestoreSessions(boolean restoreSessions) {
663        this.restoreSessions = restoreSessions;
664    }
665
666    public boolean isTrackTransactions() {
667        return trackTransactions;
668    }
669
670    public void setTrackTransactions(boolean trackTransactions) {
671        this.trackTransactions = trackTransactions;
672    }
673
674    public boolean isTrackTransactionProducers() {
675        return this.trackTransactionProducers;
676    }
677
678    public void setTrackTransactionProducers(boolean trackTransactionProducers) {
679        this.trackTransactionProducers = trackTransactionProducers;
680    }
681
682    public boolean isRestoreTransaction() {
683        return restoreTransaction;
684    }
685
686    public void setRestoreTransaction(boolean restoreTransaction) {
687        this.restoreTransaction = restoreTransaction;
688    }
689
690    public boolean isTrackMessages() {
691        return trackMessages;
692    }
693
694    public void setTrackMessages(boolean trackMessages) {
695        this.trackMessages = trackMessages;
696    }
697
698    public int getMaxCacheSize() {
699        return maxCacheSize;
700    }
701
702    public void setMaxCacheSize(int maxCacheSize) {
703        this.maxCacheSize = maxCacheSize;
704    }
705
706    /**
707     * @return the current cache size for the Message and MessagePull Command cache.
708     */
709    public long getCurrentCacheSize() {
710        return this.currentCacheSize;
711    }
712
713    public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
714        ConnectionState connectionState = connectionStates.get(connectionId);
715        if (connectionState != null) {
716            connectionState.setConnectionInterruptProcessingComplete(true);
717            Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers();
718            for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) {
719                ConsumerControl control = new ConsumerControl();
720                control.setConsumerId(entry.getKey());
721                control.setPrefetch(entry.getValue().getPrefetchSize());
722                control.setDestination(entry.getValue().getDestination());
723                try {
724                    if (LOG.isDebugEnabled()) {
725                        LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch());
726                    }
727                    transport.oneway(control);
728                } catch (Exception ex) {
729                    if (LOG.isDebugEnabled()) {
730                        LOG.debug("Failed to submit control for consumer: " + control.getConsumerId()
731                                + " with: " + control.getPrefetch(), ex);
732                    }
733                }
734            }
735            stalledConsumers.clear();
736        }
737    }
738
739    public void transportInterrupted(ConnectionId connectionId) {
740        ConnectionState connectionState = connectionStates.get(connectionId);
741        if (connectionState != null) {
742            connectionState.setConnectionInterruptProcessingComplete(false);
743        }
744    }
745}