001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.state;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.LinkedHashMap;
022import java.util.Map;
023import java.util.Map.Entry;
024import java.util.Vector;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027
028import javax.jms.TransactionRolledBackException;
029import javax.transaction.xa.XAResource;
030
031import org.apache.activemq.command.Command;
032import org.apache.activemq.command.ConnectionId;
033import org.apache.activemq.command.ConnectionInfo;
034import org.apache.activemq.command.ConsumerControl;
035import org.apache.activemq.command.ConsumerId;
036import org.apache.activemq.command.ConsumerInfo;
037import org.apache.activemq.command.DestinationInfo;
038import org.apache.activemq.command.ExceptionResponse;
039import org.apache.activemq.command.IntegerResponse;
040import org.apache.activemq.command.Message;
041import org.apache.activemq.command.MessagePull;
042import org.apache.activemq.command.ProducerId;
043import org.apache.activemq.command.ProducerInfo;
044import org.apache.activemq.command.Response;
045import org.apache.activemq.command.SessionId;
046import org.apache.activemq.command.SessionInfo;
047import org.apache.activemq.command.TransactionInfo;
048import org.apache.activemq.transport.Transport;
049import org.apache.activemq.util.IOExceptionSupport;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Tracks the state of a connection so a newly established transport can be
055 * re-initialized to the state that was tracked.
056 *
057 *
058 */
059public class ConnectionStateTracker extends CommandVisitorAdapter {
060    private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class);
061
062    private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
063    private static final int MESSAGE_PULL_SIZE = 400;
064    protected final ConcurrentMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<>();
065
066    private boolean trackTransactions;
067    private boolean restoreSessions = true;
068    private boolean restoreConsumers = true;
069    private boolean restoreProducers = true;
070    private boolean restoreTransaction = true;
071    private boolean trackMessages = true;
072    private boolean trackTransactionProducers = true;
073    private int maxCacheSize = 128 * 1024;
074    private long currentCacheSize;  // use long to prevent overflow for folks who set high max.
075
076    private final Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
077        @Override
078        protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
079            boolean result = currentCacheSize > maxCacheSize;
080            if (result) {
081                if (eldest.getValue() instanceof Message) {
082                    currentCacheSize -= ((Message)eldest.getValue()).getSize();
083                } else if (eldest.getValue() instanceof MessagePull) {
084                    currentCacheSize -= MESSAGE_PULL_SIZE;
085                }
086                if (LOG.isTraceEnabled()) {
087                    LOG.trace("removing tracked message: " + eldest.getKey());
088                }
089            }
090            return result;
091        }
092    };
093
094    private class RemoveTransactionAction implements ResponseHandler {
095        private final TransactionInfo info;
096
097        public RemoveTransactionAction(TransactionInfo info) {
098            this.info = info;
099        }
100
101        @Override
102        public void onResponse(Command response) {
103            ConnectionId connectionId = info.getConnectionId();
104            ConnectionState cs = connectionStates.get(connectionId);
105            if (cs != null) {
106                cs.removeTransactionState(info.getTransactionId());
107            }
108        }
109    }
110
111    private final class ExceptionResponseCheckAction implements ResponseHandler {
112        private final Command tracked;
113
114        public ExceptionResponseCheckAction(Command tracked) {
115            this.tracked = tracked;
116        }
117
118        @Override
119        public void onResponse(Command response) {
120            if (ExceptionResponse.DATA_STRUCTURE_TYPE == response.getDataStructureType()) {
121                if (tracked.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
122                    processRemoveConsumer(((ConsumerInfo) tracked).getConsumerId(), 0l);
123                } else if (tracked.getDataStructureType() == ProducerInfo.DATA_STRUCTURE_TYPE) {
124                    processRemoveProducer(((ProducerInfo) tracked).getProducerId());
125                }
126            }
127        }
128    }
129
130    private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
131        public PrepareReadonlyTransactionAction(TransactionInfo info) {
132            super(info);
133        }
134
135        @Override
136        public void onResponse(Command command) {
137            if (command instanceof IntegerResponse) {
138                IntegerResponse response = (IntegerResponse) command;
139                if (XAResource.XA_RDONLY == response.getResult()) {
140                    // all done, no commit or rollback from TM
141                    super.onResponse(command);
142                }
143            }
144        }
145    }
146
147    /**
148     * Entry point for all tracked commands in the tracker.  Commands should be tracked before
149     * there is an attempt to send them on the wire.  Upon a successful send of a command it is
150     * necessary to call the trackBack method to complete the tracking of the given command.
151     *
152     * @param command
153     *      The command that is to be tracked by this tracker.
154     *
155     * @return null if the command is not state tracked.
156     *
157     * @throws IOException if an error occurs during setup of the tracking operation.
158     */
159    public Tracked track(Command command) throws IOException {
160        try {
161            return (Tracked)command.visit(this);
162        } catch (IOException e) {
163            throw e;
164        } catch (Throwable e) {
165            throw IOExceptionSupport.create(e);
166        }
167    }
168
169    /**
170     * Completes the two phase tracking operation for a command that is sent on the wire.  Once
171     * the command is sent successfully to complete the tracking operation or otherwise update
172     * the state of the tracker.
173     *
174     * @param command
175     *      The command that was previously provided to the track method.
176     */
177    public void trackBack(Command command) {
178        if (command != null) {
179            if (trackMessages && command.isMessage()) {
180                Message message = (Message) command;
181                if (message.getTransactionId()==null) {
182                    currentCacheSize = currentCacheSize +  message.getSize();
183                }
184            } else if (command instanceof MessagePull) {
185                // We only track one MessagePull per consumer so only add to cache size
186                // when the command has been marked as tracked.
187                if (((MessagePull)command).isTracked()) {
188                    // just needs to be a rough estimate of size, ~4 identifiers
189                    currentCacheSize += MESSAGE_PULL_SIZE;
190                }
191            }
192        }
193    }
194
195    public void restore(Transport transport) throws IOException {
196        // Restore the connections.
197        for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
198            ConnectionState connectionState = iter.next();
199            connectionState.getInfo().setFailoverReconnect(true);
200            if (LOG.isDebugEnabled()) {
201                LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
202            }
203            transport.oneway(connectionState.getInfo());
204            restoreTempDestinations(transport, connectionState);
205
206            if (restoreSessions) {
207                restoreSessions(transport, connectionState);
208            }
209
210            if (restoreTransaction) {
211                restoreTransactions(transport, connectionState);
212            }
213        }
214
215        // now flush messages and MessagePull commands.
216        for (Command msg : messageCache.values()) {
217            if (LOG.isDebugEnabled()) {
218                LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg));
219            }
220            transport.oneway(msg);
221        }
222    }
223
224    private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
225        Vector<TransactionInfo> toRollback = new Vector<>();
226        for (TransactionState transactionState : connectionState.getTransactionStates()) {
227            if (LOG.isDebugEnabled()) {
228                LOG.debug("tx: " + transactionState.getId());
229            }
230
231            // rollback any completed transactions - no way to know if commit got there
232            // or if reply went missing
233            //
234            if (!transactionState.getCommands().isEmpty()) {
235                Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
236                if (lastCommand instanceof TransactionInfo) {
237                    TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
238                    if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
239                        if (LOG.isDebugEnabled()) {
240                            LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
241                        }
242                        toRollback.add(transactionInfo);
243                        continue;
244                    }
245                }
246            }
247
248            // replay short lived producers that may have been involved in the transaction
249            for (ProducerState producerState : transactionState.getProducerStates().values()) {
250                if (LOG.isDebugEnabled()) {
251                    LOG.debug("tx replay producer :" + producerState.getInfo());
252                }
253                transport.oneway(producerState.getInfo());
254            }
255
256            for (Command command : transactionState.getCommands()) {
257                if (LOG.isDebugEnabled()) {
258                    LOG.debug("tx replay: " + command);
259                }
260                transport.oneway(command);
261            }
262
263            for (ProducerState producerState : transactionState.getProducerStates().values()) {
264                if (LOG.isDebugEnabled()) {
265                    LOG.debug("tx remove replayed producer :" + producerState.getInfo());
266                }
267                transport.oneway(producerState.getInfo().createRemoveCommand());
268            }
269        }
270
271        for (TransactionInfo command: toRollback) {
272            // respond to the outstanding commit
273            ExceptionResponse response = new ExceptionResponse();
274            response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
275            response.setCorrelationId(command.getCommandId());
276            transport.getTransportListener().onCommand(response);
277        }
278    }
279
280    /**
281     * @param transport
282     * @param connectionState
283     * @throws IOException
284     */
285    protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
286        // Restore the connection's sessions
287        for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
288            SessionState sessionState = (SessionState)iter2.next();
289            if (LOG.isDebugEnabled()) {
290                LOG.debug("session: " + sessionState.getInfo().getSessionId());
291            }
292            transport.oneway(sessionState.getInfo());
293
294            if (restoreProducers) {
295                restoreProducers(transport, sessionState);
296            }
297
298            if (restoreConsumers) {
299                restoreConsumers(transport, sessionState);
300            }
301        }
302    }
303
304    /**
305     * @param transport
306     * @param sessionState
307     * @throws IOException
308     */
309    protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
310        // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
311        final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId());
312        final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete();
313        for (ConsumerState consumerState : sessionState.getConsumerStates()) {
314            ConsumerInfo infoToSend = consumerState.getInfo();
315            if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) {
316                infoToSend = consumerState.getInfo().copy();
317                connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo());
318                infoToSend.setPrefetchSize(0);
319                if (LOG.isDebugEnabled()) {
320                    LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize());
321                }
322            }
323            if (LOG.isDebugEnabled()) {
324                LOG.debug("consumer: " + infoToSend.getConsumerId());
325            }
326            transport.oneway(infoToSend);
327        }
328    }
329
330    /**
331     * @param transport
332     * @param sessionState
333     * @throws IOException
334     */
335    protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
336        // Restore the session's producers
337        for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
338            ProducerState producerState = (ProducerState)iter3.next();
339            if (LOG.isDebugEnabled()) {
340                LOG.debug("producer: " + producerState.getInfo().getProducerId());
341            }
342            transport.oneway(producerState.getInfo());
343        }
344    }
345
346    /**
347     * @param transport
348     * @param connectionState
349     * @throws IOException
350     */
351    protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
352        throws IOException {
353        // Restore the connection's temp destinations.
354        for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) {
355            DestinationInfo info = (DestinationInfo)iter2.next();
356            transport.oneway(info);
357            if (LOG.isDebugEnabled()) {
358                LOG.debug("tempDest: " + info.getDestination());
359            }
360        }
361    }
362
363    @Override
364    public Response processAddDestination(DestinationInfo info) {
365        if (info != null) {
366            ConnectionState cs = connectionStates.get(info.getConnectionId());
367            if (cs != null && info.getDestination().isTemporary()) {
368                cs.addTempDestination(info);
369            }
370        }
371        return TRACKED_RESPONSE_MARKER;
372    }
373
374    @Override
375    public Response processRemoveDestination(DestinationInfo info) {
376        if (info != null) {
377            ConnectionState cs = connectionStates.get(info.getConnectionId());
378            if (cs != null && info.getDestination().isTemporary()) {
379                cs.removeTempDestination(info.getDestination());
380            }
381        }
382        return TRACKED_RESPONSE_MARKER;
383    }
384
385    @Override
386    public Response processAddProducer(ProducerInfo info) {
387        if (info != null && info.getProducerId() != null) {
388            SessionId sessionId = info.getProducerId().getParentId();
389            if (sessionId != null) {
390                ConnectionId connectionId = sessionId.getParentId();
391                if (connectionId != null) {
392                    ConnectionState cs = connectionStates.get(connectionId);
393                    if (cs != null) {
394                        SessionState ss = cs.getSessionState(sessionId);
395                        if (ss != null) {
396                            ss.addProducer(info);
397                            if (info.isResponseRequired()) {
398                                return new Tracked(new ExceptionResponseCheckAction(info));
399                            }
400                        }
401                    }
402                }
403            }
404        }
405        return TRACKED_RESPONSE_MARKER;
406    }
407
408    @Override
409    public Response processRemoveProducer(ProducerId id) {
410        if (id != null) {
411            SessionId sessionId = id.getParentId();
412            if (sessionId != null) {
413                ConnectionId connectionId = sessionId.getParentId();
414                if (connectionId != null) {
415                    ConnectionState cs = connectionStates.get(connectionId);
416                    if (cs != null) {
417                        SessionState ss = cs.getSessionState(sessionId);
418                        if (ss != null) {
419                            ss.removeProducer(id);
420                        }
421                    }
422                }
423            }
424        }
425        return TRACKED_RESPONSE_MARKER;
426    }
427
428    @Override
429    public Response processAddConsumer(ConsumerInfo info) {
430        if (info != null) {
431            SessionId sessionId = info.getConsumerId().getParentId();
432            if (sessionId != null) {
433                ConnectionId connectionId = sessionId.getParentId();
434                if (connectionId != null) {
435                    ConnectionState cs = connectionStates.get(connectionId);
436                    if (cs != null) {
437                        SessionState ss = cs.getSessionState(sessionId);
438                        if (ss != null) {
439                            ss.addConsumer(info);
440                            if (info.isResponseRequired()) {
441                                return new Tracked(new ExceptionResponseCheckAction(info));
442                            }
443                        }
444                    }
445                }
446            }
447        }
448        return TRACKED_RESPONSE_MARKER;
449    }
450
451    @Override
452    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
453        if (id != null) {
454            SessionId sessionId = id.getParentId();
455            if (sessionId != null) {
456                ConnectionId connectionId = sessionId.getParentId();
457                if (connectionId != null) {
458                    ConnectionState cs = connectionStates.get(connectionId);
459                    if (cs != null) {
460                        SessionState ss = cs.getSessionState(sessionId);
461                        if (ss != null) {
462                            ss.removeConsumer(id);
463                        }
464                        cs.getRecoveringPullConsumers().remove(id);
465                    }
466                }
467            }
468        }
469        return TRACKED_RESPONSE_MARKER;
470    }
471
472    @Override
473    public Response processAddSession(SessionInfo info) {
474        if (info != null) {
475            ConnectionId connectionId = info.getSessionId().getParentId();
476            if (connectionId != null) {
477                ConnectionState cs = connectionStates.get(connectionId);
478                if (cs != null) {
479                    cs.addSession(info);
480                }
481            }
482        }
483        return TRACKED_RESPONSE_MARKER;
484    }
485
486    @Override
487    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
488        if (id != null) {
489            ConnectionId connectionId = id.getParentId();
490            if (connectionId != null) {
491                ConnectionState cs = connectionStates.get(connectionId);
492                if (cs != null) {
493                    cs.removeSession(id);
494                }
495            }
496        }
497        return TRACKED_RESPONSE_MARKER;
498    }
499
500    @Override
501    public Response processAddConnection(ConnectionInfo info) {
502        if (info != null) {
503            connectionStates.put(info.getConnectionId(), new ConnectionState(info));
504        }
505        return TRACKED_RESPONSE_MARKER;
506    }
507
508    @Override
509    public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
510        if (id != null) {
511            connectionStates.remove(id);
512        }
513        return TRACKED_RESPONSE_MARKER;
514    }
515
516    @Override
517    public Response processMessage(Message send) throws Exception {
518        if (send != null) {
519            if (trackTransactions && send.getTransactionId() != null) {
520                ProducerId producerId = send.getProducerId();
521                ConnectionId connectionId = producerId.getParentId().getParentId();
522                if (connectionId != null) {
523                    ConnectionState cs = connectionStates.get(connectionId);
524                    if (cs != null) {
525                        TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
526                        if (transactionState != null) {
527                            transactionState.addCommand(send);
528
529                            if (trackTransactionProducers) {
530                                // for jmstemplate, track the producer in case it is closed before commit
531                                // and needs to be replayed
532                                SessionState ss = cs.getSessionState(producerId.getParentId());
533                                ProducerState producerState = ss.getProducerState(producerId);
534                                producerState.setTransactionState(transactionState);
535                            }
536                        }
537                    }
538                }
539                return TRACKED_RESPONSE_MARKER;
540            }else if (trackMessages) {
541                messageCache.put(send.getMessageId(), send);
542            }
543        }
544        return null;
545    }
546
547    @Override
548    public Response processBeginTransaction(TransactionInfo info) {
549        if (trackTransactions && info != null && info.getTransactionId() != null) {
550            ConnectionId connectionId = info.getConnectionId();
551            if (connectionId != null) {
552                ConnectionState cs = connectionStates.get(connectionId);
553                if (cs != null) {
554                    cs.addTransactionState(info.getTransactionId());
555                    TransactionState state = cs.getTransactionState(info.getTransactionId());
556                    state.addCommand(info);
557                }
558            }
559            return TRACKED_RESPONSE_MARKER;
560        }
561        return null;
562    }
563
564    @Override
565    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
566        if (trackTransactions && info != null && info.getTransactionId() != null) {
567            ConnectionId connectionId = info.getConnectionId();
568            if (connectionId != null) {
569                ConnectionState cs = connectionStates.get(connectionId);
570                if (cs != null) {
571                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
572                    if (transactionState != null) {
573                        transactionState.addCommand(info);
574                        return new Tracked(new PrepareReadonlyTransactionAction(info));
575                    }
576                }
577            }
578        }
579        return null;
580    }
581
582    @Override
583    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
584        if (trackTransactions && info != null && info.getTransactionId() != null) {
585            ConnectionId connectionId = info.getConnectionId();
586            if (connectionId != null) {
587                ConnectionState cs = connectionStates.get(connectionId);
588                if (cs != null) {
589                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
590                    if (transactionState != null) {
591                        transactionState.addCommand(info);
592                        return new Tracked(new RemoveTransactionAction(info));
593                    }
594                }
595            }
596        }
597        return null;
598    }
599
600    @Override
601    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
602        if (trackTransactions && info != null && info.getTransactionId() != null) {
603            ConnectionId connectionId = info.getConnectionId();
604            if (connectionId != null) {
605                ConnectionState cs = connectionStates.get(connectionId);
606                if (cs != null) {
607                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
608                    if (transactionState != null) {
609                        transactionState.addCommand(info);
610                        return new Tracked(new RemoveTransactionAction(info));
611                    }
612                }
613            }
614        }
615        return null;
616    }
617
618    @Override
619    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
620        if (trackTransactions && info != null && info.getTransactionId() != null) {
621            ConnectionId connectionId = info.getConnectionId();
622            if (connectionId != null) {
623                ConnectionState cs = connectionStates.get(connectionId);
624                if (cs != null) {
625                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
626                    if (transactionState != null) {
627                        transactionState.addCommand(info);
628                        return new Tracked(new RemoveTransactionAction(info));
629                    }
630                }
631            }
632        }
633        return null;
634    }
635
636    @Override
637    public Response processEndTransaction(TransactionInfo info) throws Exception {
638        if (trackTransactions && info != null && info.getTransactionId() != null) {
639            ConnectionId connectionId = info.getConnectionId();
640            if (connectionId != null) {
641                ConnectionState cs = connectionStates.get(connectionId);
642                if (cs != null) {
643                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
644                    if (transactionState != null) {
645                        transactionState.addCommand(info);
646                    }
647                }
648            }
649            return TRACKED_RESPONSE_MARKER;
650        }
651        return null;
652    }
653
654    @Override
655    public Response processMessagePull(MessagePull pull) throws Exception {
656        if (pull != null) {
657            // leave a single instance in the cache
658            final String id = pull.getDestination() + "::" + pull.getConsumerId();
659            if (messageCache.put(id.intern(), pull) == null) {
660                // Only marked as tracked if this is the first request we've seen.
661                pull.setTracked(true);
662            }
663        }
664        return null;
665    }
666
667    public boolean isRestoreConsumers() {
668        return restoreConsumers;
669    }
670
671    public void setRestoreConsumers(boolean restoreConsumers) {
672        this.restoreConsumers = restoreConsumers;
673    }
674
675    public boolean isRestoreProducers() {
676        return restoreProducers;
677    }
678
679    public void setRestoreProducers(boolean restoreProducers) {
680        this.restoreProducers = restoreProducers;
681    }
682
683    public boolean isRestoreSessions() {
684        return restoreSessions;
685    }
686
687    public void setRestoreSessions(boolean restoreSessions) {
688        this.restoreSessions = restoreSessions;
689    }
690
691    public boolean isTrackTransactions() {
692        return trackTransactions;
693    }
694
695    public void setTrackTransactions(boolean trackTransactions) {
696        this.trackTransactions = trackTransactions;
697    }
698
699    public boolean isTrackTransactionProducers() {
700        return this.trackTransactionProducers;
701    }
702
703    public void setTrackTransactionProducers(boolean trackTransactionProducers) {
704        this.trackTransactionProducers = trackTransactionProducers;
705    }
706
707    public boolean isRestoreTransaction() {
708        return restoreTransaction;
709    }
710
711    public void setRestoreTransaction(boolean restoreTransaction) {
712        this.restoreTransaction = restoreTransaction;
713    }
714
715    public boolean isTrackMessages() {
716        return trackMessages;
717    }
718
719    public void setTrackMessages(boolean trackMessages) {
720        this.trackMessages = trackMessages;
721    }
722
723    public int getMaxCacheSize() {
724        return maxCacheSize;
725    }
726
727    public void setMaxCacheSize(int maxCacheSize) {
728        this.maxCacheSize = maxCacheSize;
729    }
730
731    /**
732     * @return the current cache size for the Message and MessagePull Command cache.
733     */
734    public long getCurrentCacheSize() {
735        return this.currentCacheSize;
736    }
737
738    public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
739        ConnectionState connectionState = connectionStates.get(connectionId);
740        if (connectionState != null) {
741            connectionState.setConnectionInterruptProcessingComplete(true);
742            Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers();
743            for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) {
744                ConsumerControl control = new ConsumerControl();
745                control.setConsumerId(entry.getKey());
746                control.setPrefetch(entry.getValue().getPrefetchSize());
747                control.setDestination(entry.getValue().getDestination());
748                try {
749                    if (LOG.isDebugEnabled()) {
750                        LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch());
751                    }
752                    transport.oneway(control);
753                } catch (Exception ex) {
754                    if (LOG.isDebugEnabled()) {
755                        LOG.debug("Failed to submit control for consumer: " + control.getConsumerId()
756                                + " with: " + control.getPrefetch(), ex);
757                    }
758                }
759            }
760            stalledConsumers.clear();
761        }
762    }
763
764    public void transportInterrupted(ConnectionId connectionId) {
765        ConnectionState connectionState = connectionStates.get(connectionId);
766        if (connectionState != null) {
767            connectionState.setConnectionInterruptProcessingComplete(false);
768        }
769    }
770}