001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.EOFException;
020import java.io.IOException;
021import java.net.SocketException;
022import java.net.URI;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Properties;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicReference;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038
039import javax.transaction.xa.XAResource;
040
041import org.apache.activemq.advisory.AdvisorySupport;
042import org.apache.activemq.broker.region.ConnectionStatistics;
043import org.apache.activemq.broker.region.RegionBroker;
044import org.apache.activemq.command.ActiveMQDestination;
045import org.apache.activemq.command.BrokerInfo;
046import org.apache.activemq.command.Command;
047import org.apache.activemq.command.CommandTypes;
048import org.apache.activemq.command.ConnectionControl;
049import org.apache.activemq.command.ConnectionError;
050import org.apache.activemq.command.ConnectionId;
051import org.apache.activemq.command.ConnectionInfo;
052import org.apache.activemq.command.ConsumerControl;
053import org.apache.activemq.command.ConsumerId;
054import org.apache.activemq.command.ConsumerInfo;
055import org.apache.activemq.command.ControlCommand;
056import org.apache.activemq.command.DataArrayResponse;
057import org.apache.activemq.command.DestinationInfo;
058import org.apache.activemq.command.ExceptionResponse;
059import org.apache.activemq.command.FlushCommand;
060import org.apache.activemq.command.IntegerResponse;
061import org.apache.activemq.command.KeepAliveInfo;
062import org.apache.activemq.command.Message;
063import org.apache.activemq.command.MessageAck;
064import org.apache.activemq.command.MessageDispatch;
065import org.apache.activemq.command.MessageDispatchNotification;
066import org.apache.activemq.command.MessagePull;
067import org.apache.activemq.command.ProducerAck;
068import org.apache.activemq.command.ProducerId;
069import org.apache.activemq.command.ProducerInfo;
070import org.apache.activemq.command.RemoveInfo;
071import org.apache.activemq.command.RemoveSubscriptionInfo;
072import org.apache.activemq.command.Response;
073import org.apache.activemq.command.SessionId;
074import org.apache.activemq.command.SessionInfo;
075import org.apache.activemq.command.ShutdownInfo;
076import org.apache.activemq.command.TransactionId;
077import org.apache.activemq.command.TransactionInfo;
078import org.apache.activemq.command.WireFormatInfo;
079import org.apache.activemq.network.DemandForwardingBridge;
080import org.apache.activemq.network.MBeanNetworkListener;
081import org.apache.activemq.network.NetworkBridgeConfiguration;
082import org.apache.activemq.network.NetworkBridgeFactory;
083import org.apache.activemq.security.MessageAuthorizationPolicy;
084import org.apache.activemq.state.CommandVisitor;
085import org.apache.activemq.state.ConnectionState;
086import org.apache.activemq.state.ConsumerState;
087import org.apache.activemq.state.ProducerState;
088import org.apache.activemq.state.SessionState;
089import org.apache.activemq.state.TransactionState;
090import org.apache.activemq.thread.Task;
091import org.apache.activemq.thread.TaskRunner;
092import org.apache.activemq.thread.TaskRunnerFactory;
093import org.apache.activemq.transaction.Transaction;
094import org.apache.activemq.transport.DefaultTransportListener;
095import org.apache.activemq.transport.ResponseCorrelator;
096import org.apache.activemq.transport.TransmitCallback;
097import org.apache.activemq.transport.Transport;
098import org.apache.activemq.transport.TransportDisposedIOException;
099import org.apache.activemq.util.IntrospectionSupport;
100import org.apache.activemq.util.MarshallingSupport;
101import org.slf4j.Logger;
102import org.slf4j.LoggerFactory;
103import org.slf4j.MDC;
104
105public class TransportConnection implements Connection, Task, CommandVisitor {
106    private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
107    private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
108    private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
109    // Keeps track of the broker and connector that created this connection.
110    protected final Broker broker;
111    protected final BrokerService brokerService;
112    protected final TransportConnector connector;
113    // Keeps track of the state of the connections.
114    // protected final ConcurrentHashMap localConnectionStates=new
115    // ConcurrentHashMap();
116    protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
117    // The broker and wireformat info that was exchanged.
118    protected BrokerInfo brokerInfo;
119    protected final List<Command> dispatchQueue = new LinkedList<Command>();
120    protected TaskRunner taskRunner;
121    protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>();
122    protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
123    private final Transport transport;
124    private MessageAuthorizationPolicy messageAuthorizationPolicy;
125    private WireFormatInfo wireFormatInfo;
126    // Used to do async dispatch.. this should perhaps be pushed down into the
127    // transport layer..
128    private boolean inServiceException;
129    private final ConnectionStatistics statistics = new ConnectionStatistics();
130    private boolean manageable;
131    private boolean slow;
132    private boolean markedCandidate;
133    private boolean blockedCandidate;
134    private boolean blocked;
135    private boolean connected;
136    private boolean active;
137    private final AtomicBoolean starting = new AtomicBoolean();
138    private final AtomicBoolean pendingStop = new AtomicBoolean();
139    private long timeStamp;
140    private final AtomicBoolean stopping = new AtomicBoolean(false);
141    private final CountDownLatch stopped = new CountDownLatch(1);
142    private final AtomicBoolean asyncException = new AtomicBoolean(false);
143    private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
144    private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
145    private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
146    private ConnectionContext context;
147    private boolean networkConnection;
148    private boolean faultTolerantConnection;
149    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
150    private DemandForwardingBridge duplexBridge;
151    private final TaskRunnerFactory taskRunnerFactory;
152    private final TaskRunnerFactory stopTaskRunnerFactory;
153    private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
154    private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
155    private String duplexNetworkConnectorId;
156
157    /**
158     * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
159     *                          else commands are sent async.
160     * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection.
161     */
162    public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
163                               TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) {
164        this.connector = connector;
165        this.broker = broker;
166        this.brokerService = broker.getBrokerService();
167
168        RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
169        brokerConnectionStates = rb.getConnectionStates();
170        if (connector != null) {
171            this.statistics.setParent(connector.getStatistics());
172            this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
173        }
174        this.taskRunnerFactory = taskRunnerFactory;
175        this.stopTaskRunnerFactory = stopTaskRunnerFactory;
176        this.transport = transport;
177        if( this.transport instanceof BrokerServiceAware ) {
178            ((BrokerServiceAware)this.transport).setBrokerService(brokerService);
179        }
180        this.transport.setTransportListener(new DefaultTransportListener() {
181            @Override
182            public void onCommand(Object o) {
183                serviceLock.readLock().lock();
184                try {
185                    if (!(o instanceof Command)) {
186                        throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
187                    }
188                    Command command = (Command) o;
189                    if (!brokerService.isStopping()) {
190                        Response response = service(command);
191                        if (response != null && !brokerService.isStopping()) {
192                            dispatchSync(response);
193                        }
194                    } else {
195                        throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
196                    }
197                } finally {
198                    serviceLock.readLock().unlock();
199                }
200            }
201
202            @Override
203            public void onException(IOException exception) {
204                serviceLock.readLock().lock();
205                try {
206                    serviceTransportException(exception);
207                } finally {
208                    serviceLock.readLock().unlock();
209                }
210            }
211        });
212        connected = true;
213    }
214
215    /**
216     * Returns the number of messages to be dispatched to this connection
217     *
218     * @return size of dispatch queue
219     */
220    @Override
221    public int getDispatchQueueSize() {
222        synchronized (dispatchQueue) {
223            return dispatchQueue.size();
224        }
225    }
226
227    public void serviceTransportException(IOException e) {
228        if (!stopping.get() && !pendingStop.get()) {
229            transportException.set(e);
230            if (TRANSPORTLOG.isDebugEnabled()) {
231                TRANSPORTLOG.debug(this + " failed: " + e, e);
232            } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
233                TRANSPORTLOG.warn(this + " failed: " + e);
234            }
235            stopAsync(e);
236        }
237    }
238
239    private boolean expected(IOException e) {
240        return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
241    }
242
243    private boolean isStomp() {
244        URI uri = connector.getUri();
245        return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
246    }
247
248    /**
249     * Calls the serviceException method in an async thread. Since handling a
250     * service exception closes a socket, we should not tie up broker threads
251     * since client sockets may hang or cause deadlocks.
252     */
253    @Override
254    public void serviceExceptionAsync(final IOException e) {
255        if (asyncException.compareAndSet(false, true)) {
256            new Thread("Async Exception Handler") {
257                @Override
258                public void run() {
259                    serviceException(e);
260                }
261            }.start();
262        }
263    }
264
265    /**
266     * Closes a clients connection due to a detected error. Errors are ignored
267     * if: the client is closing or broker is closing. Otherwise, the connection
268     * error transmitted to the client before stopping it's transport.
269     */
270    @Override
271    public void serviceException(Throwable e) {
272        // are we a transport exception such as not being able to dispatch
273        // synchronously to a transport
274        if (e instanceof IOException) {
275            serviceTransportException((IOException) e);
276        } else if (e.getClass() == BrokerStoppedException.class) {
277            // Handle the case where the broker is stopped
278            // But the client is still connected.
279            if (!stopping.get()) {
280                SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
281                ConnectionError ce = new ConnectionError();
282                ce.setException(e);
283                dispatchSync(ce);
284                // Record the error that caused the transport to stop
285                transportException.set(e);
286                // Wait a little bit to try to get the output buffer to flush
287                // the exception notification to the client.
288                try {
289                    Thread.sleep(500);
290                } catch (InterruptedException ie) {
291                    Thread.currentThread().interrupt();
292                }
293                // Worst case is we just kill the connection before the
294                // notification gets to him.
295                stopAsync();
296            }
297        } else if (!stopping.get() && !inServiceException) {
298            inServiceException = true;
299            try {
300                if (SERVICELOG.isDebugEnabled()) {
301                    SERVICELOG.debug("Async error occurred: " + e, e);
302                } else {
303                    SERVICELOG.warn("Async error occurred: " + e);
304                }
305                ConnectionError ce = new ConnectionError();
306                ce.setException(e);
307                if (pendingStop.get()) {
308                    dispatchSync(ce);
309                } else {
310                    dispatchAsync(ce);
311                }
312            } finally {
313                inServiceException = false;
314            }
315        }
316    }
317
318    @Override
319    public Response service(Command command) {
320        MDC.put("activemq.connector", connector.getUri().toString());
321        Response response = null;
322        boolean responseRequired = command.isResponseRequired();
323        int commandId = command.getCommandId();
324        try {
325            if (!pendingStop.get()) {
326                response = command.visit(this);
327            } else {
328                response = new ExceptionResponse(transportException.get());
329            }
330        } catch (Throwable e) {
331            if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
332                SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
333                        + " command: " + command + ", exception: " + e, e);
334            }
335
336            if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) {
337                LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause());
338                responseRequired = false;
339            }
340
341            if (responseRequired) {
342                if (e instanceof SecurityException || e.getCause() instanceof SecurityException) {
343                    SERVICELOG.warn("Security Error occurred on connection to: {}, {}",
344                            transport.getRemoteAddress(), e.getMessage());
345                }
346                response = new ExceptionResponse(e);
347            } else {
348                forceRollbackOnlyOnFailedAsyncTransactionOp(e, command);
349                serviceException(e);
350            }
351        }
352        if (responseRequired) {
353            if (response == null) {
354                response = new Response();
355            }
356            response.setCorrelationId(commandId);
357        }
358        // The context may have been flagged so that the response is not
359        // sent.
360        if (context != null) {
361            if (context.isDontSendReponse()) {
362                context.setDontSendReponse(false);
363                response = null;
364            }
365            context = null;
366        }
367        MDC.remove("activemq.connector");
368        return response;
369    }
370
371    private void forceRollbackOnlyOnFailedAsyncTransactionOp(Throwable e, Command command) {
372        if (brokerService.isRollbackOnlyOnAsyncException() && !(e instanceof IOException) && isInTransaction(command)) {
373            Transaction transaction = getActiveTransaction(command);
374            if (transaction != null && !transaction.isRollbackOnly()) {
375                LOG.debug("on async exception, force rollback of transaction for: " + command, e);
376                transaction.setRollbackOnly(e);
377            }
378        }
379    }
380
381    private Transaction getActiveTransaction(Command command) {
382        Transaction transaction = null;
383        try {
384            if (command instanceof Message) {
385                Message messageSend = (Message) command;
386                ProducerId producerId = messageSend.getProducerId();
387                ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
388                transaction = producerExchange.getConnectionContext().getTransactions().get(messageSend.getTransactionId());
389            } else if (command instanceof  MessageAck) {
390                MessageAck messageAck = (MessageAck) command;
391                ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(messageAck.getConsumerId());
392                if (consumerExchange != null) {
393                    transaction = consumerExchange.getConnectionContext().getTransactions().get(messageAck.getTransactionId());
394                }
395            }
396        } catch(Exception ignored){
397            LOG.trace("failed to find active transaction for command: " + command, ignored);
398        }
399        return transaction;
400    }
401
402    private boolean isInTransaction(Command command) {
403        return command instanceof Message && ((Message)command).isInTransaction()
404                || command instanceof MessageAck && ((MessageAck)command).isInTransaction();
405    }
406
407    @Override
408    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
409        return null;
410    }
411
412    @Override
413    public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
414        broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
415        return null;
416    }
417
418    @Override
419    public Response processWireFormat(WireFormatInfo info) throws Exception {
420        wireFormatInfo = info;
421        protocolVersion.set(info.getVersion());
422        return null;
423    }
424
425    @Override
426    public Response processShutdown(ShutdownInfo info) throws Exception {
427        stopAsync();
428        return null;
429    }
430
431    @Override
432    public Response processFlush(FlushCommand command) throws Exception {
433        return null;
434    }
435
436    @Override
437    public Response processBeginTransaction(TransactionInfo info) throws Exception {
438        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
439        context = null;
440        if (cs != null) {
441            context = cs.getContext();
442        }
443        if (cs == null) {
444            throw new NullPointerException("Context is null");
445        }
446        // Avoid replaying dup commands
447        if (cs.getTransactionState(info.getTransactionId()) == null) {
448            cs.addTransactionState(info.getTransactionId());
449            broker.beginTransaction(context, info.getTransactionId());
450        }
451        return null;
452    }
453
454    @Override
455    public int getActiveTransactionCount() {
456        int rc = 0;
457        for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
458            Collection<TransactionState> transactions = cs.getTransactionStates();
459            for (TransactionState transaction : transactions) {
460                rc++;
461            }
462        }
463        return rc;
464    }
465
466    @Override
467    public Long getOldestActiveTransactionDuration() {
468        TransactionState oldestTX = null;
469        for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
470            Collection<TransactionState> transactions = cs.getTransactionStates();
471            for (TransactionState transaction : transactions) {
472                if( oldestTX ==null || oldestTX.getCreatedAt() < transaction.getCreatedAt() ) {
473                    oldestTX = transaction;
474                }
475            }
476        }
477        if( oldestTX == null ) {
478            return null;
479        }
480        return System.currentTimeMillis() - oldestTX.getCreatedAt();
481    }
482
483    @Override
484    public Response processEndTransaction(TransactionInfo info) throws Exception {
485        // No need to do anything. This packet is just sent by the client
486        // make sure he is synced with the server as commit command could
487        // come from a different connection.
488        return null;
489    }
490
491    @Override
492    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
493        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
494        context = null;
495        if (cs != null) {
496            context = cs.getContext();
497        }
498        if (cs == null) {
499            throw new NullPointerException("Context is null");
500        }
501        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
502        if (transactionState == null) {
503            throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
504                    + info.getTransactionId());
505        }
506        // Avoid dups.
507        if (!transactionState.isPrepared()) {
508            transactionState.setPrepared(true);
509            int result = broker.prepareTransaction(context, info.getTransactionId());
510            transactionState.setPreparedResult(result);
511            if (result == XAResource.XA_RDONLY) {
512                // we are done, no further rollback or commit from TM
513                cs.removeTransactionState(info.getTransactionId());
514            }
515            IntegerResponse response = new IntegerResponse(result);
516            return response;
517        } else {
518            IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
519            return response;
520        }
521    }
522
523    @Override
524    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
525        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
526        context = cs.getContext();
527        cs.removeTransactionState(info.getTransactionId());
528        broker.commitTransaction(context, info.getTransactionId(), true);
529        return null;
530    }
531
532    @Override
533    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
534        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
535        context = cs.getContext();
536        cs.removeTransactionState(info.getTransactionId());
537        broker.commitTransaction(context, info.getTransactionId(), false);
538        return null;
539    }
540
541    @Override
542    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
543        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
544        context = cs.getContext();
545        cs.removeTransactionState(info.getTransactionId());
546        broker.rollbackTransaction(context, info.getTransactionId());
547        return null;
548    }
549
550    @Override
551    public Response processForgetTransaction(TransactionInfo info) throws Exception {
552        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
553        context = cs.getContext();
554        broker.forgetTransaction(context, info.getTransactionId());
555        return null;
556    }
557
558    @Override
559    public Response processRecoverTransactions(TransactionInfo info) throws Exception {
560        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
561        context = cs.getContext();
562        TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
563        return new DataArrayResponse(preparedTransactions);
564    }
565
566    @Override
567    public Response processMessage(Message messageSend) throws Exception {
568        ProducerId producerId = messageSend.getProducerId();
569        ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
570        if (producerExchange.canDispatch(messageSend)) {
571            broker.send(producerExchange, messageSend);
572        }
573        return null;
574    }
575
576    @Override
577    public Response processMessageAck(MessageAck ack) throws Exception {
578        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
579        if (consumerExchange != null) {
580            broker.acknowledge(consumerExchange, ack);
581        } else if (ack.isInTransaction()) {
582            LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack);
583        }
584        return null;
585    }
586
587    @Override
588    public Response processMessagePull(MessagePull pull) throws Exception {
589        return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
590    }
591
592    @Override
593    public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
594        broker.processDispatchNotification(notification);
595        return null;
596    }
597
598    @Override
599    public Response processAddDestination(DestinationInfo info) throws Exception {
600        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
601        broker.addDestinationInfo(cs.getContext(), info);
602        if (info.getDestination().isTemporary()) {
603            cs.addTempDestination(info);
604        }
605        return null;
606    }
607
608    @Override
609    public Response processRemoveDestination(DestinationInfo info) throws Exception {
610        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
611        broker.removeDestinationInfo(cs.getContext(), info);
612        if (info.getDestination().isTemporary()) {
613            cs.removeTempDestination(info.getDestination());
614        }
615        return null;
616    }
617
618    @Override
619    public Response processAddProducer(ProducerInfo info) throws Exception {
620        SessionId sessionId = info.getProducerId().getParentId();
621        ConnectionId connectionId = sessionId.getParentId();
622        TransportConnectionState cs = lookupConnectionState(connectionId);
623        if (cs == null) {
624            throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
625                    + connectionId);
626        }
627        SessionState ss = cs.getSessionState(sessionId);
628        if (ss == null) {
629            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
630                    + sessionId);
631        }
632        // Avoid replaying dup commands
633        if (!ss.getProducerIds().contains(info.getProducerId())) {
634            ActiveMQDestination destination = info.getDestination();
635            // Do not check for null here as it would cause the count of max producers to exclude
636            // anonymous producers.  The isAdvisoryTopic method checks for null so it is safe to
637            // call it from here with a null Destination value.
638            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
639                if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
640                    throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
641                }
642            }
643            broker.addProducer(cs.getContext(), info);
644            try {
645                ss.addProducer(info);
646            } catch (IllegalStateException e) {
647                broker.removeProducer(cs.getContext(), info);
648            }
649
650        }
651        return null;
652    }
653
654    @Override
655    public Response processRemoveProducer(ProducerId id) throws Exception {
656        SessionId sessionId = id.getParentId();
657        ConnectionId connectionId = sessionId.getParentId();
658        TransportConnectionState cs = lookupConnectionState(connectionId);
659        SessionState ss = cs.getSessionState(sessionId);
660        if (ss == null) {
661            throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
662                    + sessionId);
663        }
664        ProducerState ps = ss.removeProducer(id);
665        if (ps == null) {
666            throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
667        }
668        removeProducerBrokerExchange(id);
669        broker.removeProducer(cs.getContext(), ps.getInfo());
670        return null;
671    }
672
673    @Override
674    public Response processAddConsumer(ConsumerInfo info) throws Exception {
675        SessionId sessionId = info.getConsumerId().getParentId();
676        ConnectionId connectionId = sessionId.getParentId();
677        TransportConnectionState cs = lookupConnectionState(connectionId);
678        if (cs == null) {
679            throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: "
680                    + connectionId);
681        }
682        SessionState ss = cs.getSessionState(sessionId);
683        if (ss == null) {
684            throw new IllegalStateException(broker.getBrokerName()
685                    + " Cannot add a consumer to a session that had not been registered: " + sessionId);
686        }
687        // Avoid replaying dup commands
688        if (!ss.getConsumerIds().contains(info.getConsumerId())) {
689            ActiveMQDestination destination = info.getDestination();
690            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
691                if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){
692                    throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection());
693                }
694            }
695
696            broker.addConsumer(cs.getContext(), info);
697            try {
698                ss.addConsumer(info);
699                addConsumerBrokerExchange(cs, info.getConsumerId());
700            } catch (IllegalStateException e) {
701                broker.removeConsumer(cs.getContext(), info);
702            }
703
704        }
705        return null;
706    }
707
708    @Override
709    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
710        SessionId sessionId = id.getParentId();
711        ConnectionId connectionId = sessionId.getParentId();
712        TransportConnectionState cs = lookupConnectionState(connectionId);
713        if (cs == null) {
714            throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
715                    + connectionId);
716        }
717        SessionState ss = cs.getSessionState(sessionId);
718        if (ss == null) {
719            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
720                    + sessionId);
721        }
722        ConsumerState consumerState = ss.removeConsumer(id);
723        if (consumerState == null) {
724            throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
725        }
726        ConsumerInfo info = consumerState.getInfo();
727        info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
728        broker.removeConsumer(cs.getContext(), consumerState.getInfo());
729        removeConsumerBrokerExchange(id);
730        return null;
731    }
732
733    @Override
734    public Response processAddSession(SessionInfo info) throws Exception {
735        ConnectionId connectionId = info.getSessionId().getParentId();
736        TransportConnectionState cs = lookupConnectionState(connectionId);
737        // Avoid replaying dup commands
738        if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
739            broker.addSession(cs.getContext(), info);
740            try {
741                cs.addSession(info);
742            } catch (IllegalStateException e) {
743                e.printStackTrace();
744                broker.removeSession(cs.getContext(), info);
745            }
746        }
747        return null;
748    }
749
750    @Override
751    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
752        ConnectionId connectionId = id.getParentId();
753        TransportConnectionState cs = lookupConnectionState(connectionId);
754        if (cs == null) {
755            throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
756        }
757        SessionState session = cs.getSessionState(id);
758        if (session == null) {
759            throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
760        }
761        // Don't let new consumers or producers get added while we are closing
762        // this down.
763        session.shutdown();
764        // Cascade the connection stop to the consumers and producers.
765        for (ConsumerId consumerId : session.getConsumerIds()) {
766            try {
767                processRemoveConsumer(consumerId, lastDeliveredSequenceId);
768            } catch (Throwable e) {
769                LOG.warn("Failed to remove consumer: {}", consumerId, e);
770            }
771        }
772        for (ProducerId producerId : session.getProducerIds()) {
773            try {
774                processRemoveProducer(producerId);
775            } catch (Throwable e) {
776                LOG.warn("Failed to remove producer: {}", producerId, e);
777            }
778        }
779        cs.removeSession(id);
780        broker.removeSession(cs.getContext(), session.getInfo());
781        return null;
782    }
783
784    @Override
785    public Response processAddConnection(ConnectionInfo info) throws Exception {
786        // Older clients should have been defaulting this field to true.. but
787        // they were not.
788        if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
789            info.setClientMaster(true);
790        }
791        TransportConnectionState state;
792        // Make sure 2 concurrent connections by the same ID only generate 1
793        // TransportConnectionState object.
794        synchronized (brokerConnectionStates) {
795            state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
796            if (state == null) {
797                state = new TransportConnectionState(info, this);
798                brokerConnectionStates.put(info.getConnectionId(), state);
799            }
800            state.incrementReference();
801        }
802        // If there are 2 concurrent connections for the same connection id,
803        // then last one in wins, we need to sync here
804        // to figure out the winner.
805        synchronized (state.getConnectionMutex()) {
806            if (state.getConnection() != this) {
807                LOG.debug("Killing previous stale connection: {}", state.getConnection().getRemoteAddress());
808                state.getConnection().stop();
809                LOG.debug("Connection {} taking over previous connection: {}", getRemoteAddress(), state.getConnection().getRemoteAddress());
810                state.setConnection(this);
811                state.reset(info);
812            }
813        }
814        registerConnectionState(info.getConnectionId(), state);
815        LOG.debug("Setting up new connection id: {}, address: {}, info: {}", new Object[]{ info.getConnectionId(), getRemoteAddress(), info });
816        this.faultTolerantConnection = info.isFaultTolerant();
817        // Setup the context.
818        String clientId = info.getClientId();
819        context = new ConnectionContext();
820        context.setBroker(broker);
821        context.setClientId(clientId);
822        context.setClientMaster(info.isClientMaster());
823        context.setConnection(this);
824        context.setConnectionId(info.getConnectionId());
825        context.setConnector(connector);
826        context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
827        context.setNetworkConnection(networkConnection);
828        context.setFaultTolerant(faultTolerantConnection);
829        context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
830        context.setUserName(info.getUserName());
831        context.setWireFormatInfo(wireFormatInfo);
832        context.setReconnect(info.isFailoverReconnect());
833        this.manageable = info.isManageable();
834        context.setConnectionState(state);
835        state.setContext(context);
836        state.setConnection(this);
837        if (info.getClientIp() == null) {
838            info.setClientIp(getRemoteAddress());
839        }
840
841        try {
842            broker.addConnection(context, info);
843        } catch (Exception e) {
844            synchronized (brokerConnectionStates) {
845                brokerConnectionStates.remove(info.getConnectionId());
846            }
847            unregisterConnectionState(info.getConnectionId());
848            LOG.warn("Failed to add Connection {} due to {}", info.getConnectionId(), e);
849            if (e instanceof SecurityException) {
850                // close this down - in case the peer of this transport doesn't play nice
851                delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
852            }
853            throw e;
854        }
855        if (info.isManageable()) {
856            // send ConnectionCommand
857            ConnectionControl command = this.connector.getConnectionControl();
858            command.setFaultTolerant(broker.isFaultTolerantConfiguration());
859            if (info.isFailoverReconnect()) {
860                command.setRebalanceConnection(false);
861            }
862            dispatchAsync(command);
863        }
864        return null;
865    }
866
867    @Override
868    public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
869            throws InterruptedException {
870        LOG.debug("remove connection id: {}", id);
871        TransportConnectionState cs = lookupConnectionState(id);
872        if (cs != null) {
873            // Don't allow things to be added to the connection state while we
874            // are shutting down.
875            cs.shutdown();
876            // Cascade the connection stop to the sessions.
877            for (SessionId sessionId : cs.getSessionIds()) {
878                try {
879                    processRemoveSession(sessionId, lastDeliveredSequenceId);
880                } catch (Throwable e) {
881                    SERVICELOG.warn("Failed to remove session {}", sessionId, e);
882                }
883            }
884            // Cascade the connection stop to temp destinations.
885            for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
886                DestinationInfo di = iter.next();
887                try {
888                    broker.removeDestination(cs.getContext(), di.getDestination(), 0);
889                } catch (Throwable e) {
890                    SERVICELOG.warn("Failed to remove tmp destination {}", di.getDestination(), e);
891                }
892                iter.remove();
893            }
894            try {
895                broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get());
896            } catch (Throwable e) {
897                SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e);
898            }
899            TransportConnectionState state = unregisterConnectionState(id);
900            if (state != null) {
901                synchronized (brokerConnectionStates) {
902                    // If we are the last reference, we should remove the state
903                    // from the broker.
904                    if (state.decrementReference() == 0) {
905                        brokerConnectionStates.remove(id);
906                    }
907                }
908            }
909        }
910        return null;
911    }
912
913    @Override
914    public Response processProducerAck(ProducerAck ack) throws Exception {
915        // A broker should not get ProducerAck messages.
916        return null;
917    }
918
919    @Override
920    public Connector getConnector() {
921        return connector;
922    }
923
924    @Override
925    public void dispatchSync(Command message) {
926        try {
927            processDispatch(message);
928        } catch (IOException e) {
929            serviceExceptionAsync(e);
930        }
931    }
932
933    @Override
934    public void dispatchAsync(Command message) {
935        if (!stopping.get()) {
936            if (taskRunner == null) {
937                dispatchSync(message);
938            } else {
939                synchronized (dispatchQueue) {
940                    dispatchQueue.add(message);
941                }
942                try {
943                    taskRunner.wakeup();
944                } catch (InterruptedException e) {
945                    Thread.currentThread().interrupt();
946                }
947            }
948        } else {
949            if (message.isMessageDispatch()) {
950                MessageDispatch md = (MessageDispatch) message;
951                TransmitCallback sub = md.getTransmitCallback();
952                broker.postProcessDispatch(md);
953                if (sub != null) {
954                    sub.onFailure();
955                }
956            }
957        }
958    }
959
960    protected void processDispatch(Command command) throws IOException {
961        MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
962        try {
963            if (!stopping.get()) {
964                if (messageDispatch != null) {
965                    try {
966                        broker.preProcessDispatch(messageDispatch);
967                    } catch (RuntimeException convertToIO) {
968                        throw new IOException(convertToIO);
969                    }
970                }
971                dispatch(command);
972            }
973        } catch (IOException e) {
974            if (messageDispatch != null) {
975                TransmitCallback sub = messageDispatch.getTransmitCallback();
976                broker.postProcessDispatch(messageDispatch);
977                if (sub != null) {
978                    sub.onFailure();
979                }
980                messageDispatch = null;
981                throw e;
982            } else {
983                if (TRANSPORTLOG.isDebugEnabled()) {
984                    TRANSPORTLOG.debug("Unexpected exception on asyncDispatch, command of type: " + command.getDataStructureType(), e);
985                }
986            }
987        } finally {
988            if (messageDispatch != null) {
989                TransmitCallback sub = messageDispatch.getTransmitCallback();
990                broker.postProcessDispatch(messageDispatch);
991                if (sub != null) {
992                    sub.onSuccess();
993                }
994            }
995        }
996    }
997
998    @Override
999    public boolean iterate() {
1000        try {
1001            if (pendingStop.get() || stopping.get()) {
1002                if (dispatchStopped.compareAndSet(false, true)) {
1003                    if (transportException.get() == null) {
1004                        try {
1005                            dispatch(new ShutdownInfo());
1006                        } catch (Throwable ignore) {
1007                        }
1008                    }
1009                    dispatchStoppedLatch.countDown();
1010                }
1011                return false;
1012            }
1013            if (!dispatchStopped.get()) {
1014                Command command = null;
1015                synchronized (dispatchQueue) {
1016                    if (dispatchQueue.isEmpty()) {
1017                        return false;
1018                    }
1019                    command = dispatchQueue.remove(0);
1020                }
1021                processDispatch(command);
1022                return true;
1023            }
1024            return false;
1025        } catch (IOException e) {
1026            if (dispatchStopped.compareAndSet(false, true)) {
1027                dispatchStoppedLatch.countDown();
1028            }
1029            serviceExceptionAsync(e);
1030            return false;
1031        }
1032    }
1033
1034    /**
1035     * Returns the statistics for this connection
1036     */
1037    @Override
1038    public ConnectionStatistics getStatistics() {
1039        return statistics;
1040    }
1041
1042    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1043        return messageAuthorizationPolicy;
1044    }
1045
1046    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1047        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1048    }
1049
1050    @Override
1051    public boolean isManageable() {
1052        return manageable;
1053    }
1054
1055    @Override
1056    public void start() throws Exception {
1057        try {
1058            synchronized (this) {
1059                starting.set(true);
1060                if (taskRunnerFactory != null) {
1061                    taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
1062                            + getRemoteAddress());
1063                } else {
1064                    taskRunner = null;
1065                }
1066                transport.start();
1067                active = true;
1068                BrokerInfo info = connector.getBrokerInfo().copy();
1069                if (connector.isUpdateClusterClients()) {
1070                    info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
1071                } else {
1072                    info.setPeerBrokerInfos(null);
1073                }
1074                dispatchAsync(info);
1075
1076                connector.onStarted(this);
1077            }
1078        } catch (Exception e) {
1079            // Force clean up on an error starting up.
1080            pendingStop.set(true);
1081            throw e;
1082        } finally {
1083            // stop() can be called from within the above block,
1084            // but we want to be sure start() completes before
1085            // stop() runs, so queue the stop until right now:
1086            setStarting(false);
1087            if (isPendingStop()) {
1088                LOG.debug("Calling the delayed stop() after start() {}", this);
1089                stop();
1090            }
1091        }
1092    }
1093
1094    @Override
1095    public void stop() throws Exception {
1096        // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory)
1097        // as their lifecycle is handled elsewhere
1098
1099        stopAsync();
1100        while (!stopped.await(5, TimeUnit.SECONDS)) {
1101            LOG.info("The connection to '{}' is taking a long time to shutdown.", transport.getRemoteAddress());
1102        }
1103    }
1104
1105    public void delayedStop(final int waitTime, final String reason, Throwable cause) {
1106        if (waitTime > 0) {
1107            synchronized (this) {
1108                pendingStop.set(true);
1109                transportException.set(cause);
1110            }
1111            try {
1112                stopTaskRunnerFactory.execute(new Runnable() {
1113                    @Override
1114                    public void run() {
1115                        try {
1116                            Thread.sleep(waitTime);
1117                            stopAsync();
1118                            LOG.info("Stopping {} because {}", transport.getRemoteAddress(), reason);
1119                        } catch (InterruptedException e) {
1120                        }
1121                    }
1122                });
1123            } catch (Throwable t) {
1124                LOG.warn("Cannot create stopAsync. This exception will be ignored.", t);
1125            }
1126        }
1127    }
1128
1129    public void stopAsync(Throwable cause) {
1130        transportException.set(cause);
1131        stopAsync();
1132    }
1133
1134    public void stopAsync() {
1135        // If we're in the middle of starting then go no further... for now.
1136        synchronized (this) {
1137            pendingStop.set(true);
1138            if (starting.get()) {
1139                LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
1140                return;
1141            }
1142        }
1143        if (stopping.compareAndSet(false, true)) {
1144            // Let all the connection contexts know we are shutting down
1145            // so that in progress operations can notice and unblock.
1146            List<TransportConnectionState> connectionStates = listConnectionStates();
1147            for (TransportConnectionState cs : connectionStates) {
1148                ConnectionContext connectionContext = cs.getContext();
1149                if (connectionContext != null) {
1150                    connectionContext.getStopping().set(true);
1151                }
1152            }
1153            try {
1154                stopTaskRunnerFactory.execute(new Runnable() {
1155                    @Override
1156                    public void run() {
1157                        serviceLock.writeLock().lock();
1158                        try {
1159                            doStop();
1160                        } catch (Throwable e) {
1161                            LOG.debug("Error occurred while shutting down a connection {}", this, e);
1162                        } finally {
1163                            stopped.countDown();
1164                            serviceLock.writeLock().unlock();
1165                        }
1166                    }
1167                });
1168            } catch (Throwable t) {
1169                LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t);
1170                stopped.countDown();
1171            }
1172        }
1173    }
1174
1175    @Override
1176    public String toString() {
1177        return "Transport Connection to: " + transport.getRemoteAddress();
1178    }
1179
1180    protected void doStop() throws Exception {
1181        LOG.debug("Stopping connection: {}", transport.getRemoteAddress());
1182        connector.onStopped(this);
1183        try {
1184            synchronized (this) {
1185                if (duplexBridge != null) {
1186                    duplexBridge.stop();
1187                }
1188            }
1189        } catch (Exception ignore) {
1190            LOG.trace("Exception caught stopping. This exception is ignored.", ignore);
1191        }
1192        try {
1193            transport.stop();
1194            LOG.debug("Stopped transport: {}", transport.getRemoteAddress());
1195        } catch (Exception e) {
1196            LOG.debug("Could not stop transport to {}. This exception is ignored.", transport.getRemoteAddress(), e);
1197        }
1198        if (taskRunner != null) {
1199            taskRunner.shutdown(1);
1200            taskRunner = null;
1201        }
1202        active = false;
1203        // Run the MessageDispatch callbacks so that message references get
1204        // cleaned up.
1205        synchronized (dispatchQueue) {
1206            for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
1207                Command command = iter.next();
1208                if (command.isMessageDispatch()) {
1209                    MessageDispatch md = (MessageDispatch) command;
1210                    TransmitCallback sub = md.getTransmitCallback();
1211                    broker.postProcessDispatch(md);
1212                    if (sub != null) {
1213                        sub.onFailure();
1214                    }
1215                }
1216            }
1217            dispatchQueue.clear();
1218        }
1219        //
1220        // Remove all logical connection associated with this connection
1221        // from the broker.
1222        if (!broker.isStopped()) {
1223            List<TransportConnectionState> connectionStates = listConnectionStates();
1224            connectionStates = listConnectionStates();
1225            for (TransportConnectionState cs : connectionStates) {
1226                cs.getContext().getStopping().set(true);
1227                try {
1228                    LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
1229                    processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
1230                } catch (Throwable ignore) {
1231                    ignore.printStackTrace();
1232                }
1233            }
1234        }
1235        LOG.debug("Connection Stopped: {}", getRemoteAddress());
1236    }
1237
1238    /**
1239     * @return Returns the blockedCandidate.
1240     */
1241    public boolean isBlockedCandidate() {
1242        return blockedCandidate;
1243    }
1244
1245    /**
1246     * @param blockedCandidate The blockedCandidate to set.
1247     */
1248    public void setBlockedCandidate(boolean blockedCandidate) {
1249        this.blockedCandidate = blockedCandidate;
1250    }
1251
1252    /**
1253     * @return Returns the markedCandidate.
1254     */
1255    public boolean isMarkedCandidate() {
1256        return markedCandidate;
1257    }
1258
1259    /**
1260     * @param markedCandidate The markedCandidate to set.
1261     */
1262    public void setMarkedCandidate(boolean markedCandidate) {
1263        this.markedCandidate = markedCandidate;
1264        if (!markedCandidate) {
1265            timeStamp = 0;
1266            blockedCandidate = false;
1267        }
1268    }
1269
1270    /**
1271     * @param slow The slow to set.
1272     */
1273    public void setSlow(boolean slow) {
1274        this.slow = slow;
1275    }
1276
1277    /**
1278     * @return true if the Connection is slow
1279     */
1280    @Override
1281    public boolean isSlow() {
1282        return slow;
1283    }
1284
1285    /**
1286     * @return true if the Connection is potentially blocked
1287     */
1288    public boolean isMarkedBlockedCandidate() {
1289        return markedCandidate;
1290    }
1291
1292    /**
1293     * Mark the Connection, so we can deem if it's collectable on the next sweep
1294     */
1295    public void doMark() {
1296        if (timeStamp == 0) {
1297            timeStamp = System.currentTimeMillis();
1298        }
1299    }
1300
1301    /**
1302     * @return if after being marked, the Connection is still writing
1303     */
1304    @Override
1305    public boolean isBlocked() {
1306        return blocked;
1307    }
1308
1309    /**
1310     * @return true if the Connection is connected
1311     */
1312    @Override
1313    public boolean isConnected() {
1314        return connected;
1315    }
1316
1317    /**
1318     * @param blocked The blocked to set.
1319     */
1320    public void setBlocked(boolean blocked) {
1321        this.blocked = blocked;
1322    }
1323
1324    /**
1325     * @param connected The connected to set.
1326     */
1327    public void setConnected(boolean connected) {
1328        this.connected = connected;
1329    }
1330
1331    /**
1332     * @return true if the Connection is active
1333     */
1334    @Override
1335    public boolean isActive() {
1336        return active;
1337    }
1338
1339    /**
1340     * @param active The active to set.
1341     */
1342    public void setActive(boolean active) {
1343        this.active = active;
1344    }
1345
1346    /**
1347     * @return true if the Connection is starting
1348     */
1349    public boolean isStarting() {
1350        return starting.get();
1351    }
1352
1353    @Override
1354    public synchronized boolean isNetworkConnection() {
1355        return networkConnection;
1356    }
1357
1358    @Override
1359    public boolean isFaultTolerantConnection() {
1360        return this.faultTolerantConnection;
1361    }
1362
1363    protected void setStarting(boolean starting) {
1364        this.starting.set(starting);
1365    }
1366
1367    /**
1368     * @return true if the Connection needs to stop
1369     */
1370    public boolean isPendingStop() {
1371        return pendingStop.get();
1372    }
1373
1374    protected void setPendingStop(boolean pendingStop) {
1375        this.pendingStop.set(pendingStop);
1376    }
1377
1378    @Override
1379    public Response processBrokerInfo(BrokerInfo info) {
1380        if (info.isSlaveBroker()) {
1381            LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
1382        } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1383            // so this TransportConnection is the rear end of a network bridge
1384            // We have been requested to create a two way pipe ...
1385            try {
1386                Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1387                Map<String, String> props = createMap(properties);
1388                NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1389                IntrospectionSupport.setProperties(config, props, "");
1390                config.setBrokerName(broker.getBrokerName());
1391
1392                // check for existing duplex connection hanging about
1393
1394                // We first look if existing network connection already exists for the same broker Id and network connector name
1395                // It's possible in case of brief network fault to have this transport connector side of the connection always active
1396                // and the duplex network connector side wanting to open a new one
1397                // In this case, the old connection must be broken
1398                String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1399                CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1400                synchronized (connections) {
1401                    for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
1402                        TransportConnection c = iter.next();
1403                        if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1404                            LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId);
1405                            c.stopAsync();
1406                            // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1407                            c.getStopped().await(1, TimeUnit.SECONDS);
1408                        }
1409                    }
1410                    setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1411                }
1412                Transport localTransport = NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI());
1413                Transport remoteBridgeTransport = transport;
1414                if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
1415                    // the vm transport case is already wrapped
1416                    remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport);
1417                }
1418                String duplexName = localTransport.toString();
1419                if (duplexName.contains("#")) {
1420                    duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1421                }
1422                MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName));
1423                listener.setCreatedByDuplex(true);
1424                duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1425                duplexBridge.setBrokerService(brokerService);
1426                // now turn duplex off this side
1427                info.setDuplexConnection(false);
1428                duplexBridge.setCreatedByDuplex(true);
1429                duplexBridge.duplexStart(this, brokerInfo, info);
1430                LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId);
1431                return null;
1432            } catch (TransportDisposedIOException e) {
1433                LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId);
1434                return null;
1435            } catch (Exception e) {
1436                LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e);
1437                return null;
1438            }
1439        }
1440        // We only expect to get one broker info command per connection
1441        if (this.brokerInfo != null) {
1442            LOG.warn("Unexpected extra broker info command received: {}", info);
1443        }
1444        this.brokerInfo = info;
1445        networkConnection = true;
1446        List<TransportConnectionState> connectionStates = listConnectionStates();
1447        for (TransportConnectionState cs : connectionStates) {
1448            cs.getContext().setNetworkConnection(true);
1449        }
1450        return null;
1451    }
1452
1453    @SuppressWarnings({"unchecked", "rawtypes"})
1454    private HashMap<String, String> createMap(Properties properties) {
1455        return new HashMap(properties);
1456    }
1457
1458    protected void dispatch(Command command) throws IOException {
1459        try {
1460            setMarkedCandidate(true);
1461            transport.oneway(command);
1462        } finally {
1463            setMarkedCandidate(false);
1464        }
1465    }
1466
1467    @Override
1468    public String getRemoteAddress() {
1469        return transport.getRemoteAddress();
1470    }
1471
1472    public Transport getTransport() {
1473        return transport;
1474    }
1475
1476    @Override
1477    public String getConnectionId() {
1478        List<TransportConnectionState> connectionStates = listConnectionStates();
1479        for (TransportConnectionState cs : connectionStates) {
1480            if (cs.getInfo().getClientId() != null) {
1481                return cs.getInfo().getClientId();
1482            }
1483            return cs.getInfo().getConnectionId().toString();
1484        }
1485        return null;
1486    }
1487
1488    @Override
1489    public void updateClient(ConnectionControl control) {
1490        if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1491                && this.wireFormatInfo.getVersion() >= 6) {
1492            dispatchAsync(control);
1493        }
1494    }
1495
1496    public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){
1497        ProducerBrokerExchange result = null;
1498        if (producerInfo != null && producerInfo.getProducerId() != null){
1499            synchronized (producerExchanges){
1500                result = producerExchanges.get(producerInfo.getProducerId());
1501            }
1502        }
1503        return result;
1504    }
1505
1506    private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1507        ProducerBrokerExchange result = producerExchanges.get(id);
1508        if (result == null) {
1509            synchronized (producerExchanges) {
1510                result = new ProducerBrokerExchange();
1511                TransportConnectionState state = lookupConnectionState(id);
1512                context = state.getContext();
1513                result.setConnectionContext(context);
1514                if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) {
1515                    result.setLastStoredSequenceId(brokerService.getPersistenceAdapter().getLastProducerSequenceId(id));
1516                }
1517                SessionState ss = state.getSessionState(id.getParentId());
1518                if (ss != null) {
1519                    result.setProducerState(ss.getProducerState(id));
1520                    ProducerState producerState = ss.getProducerState(id);
1521                    if (producerState != null && producerState.getInfo() != null) {
1522                        ProducerInfo info = producerState.getInfo();
1523                        result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1524                    }
1525                }
1526                producerExchanges.put(id, result);
1527            }
1528        } else {
1529            context = result.getConnectionContext();
1530        }
1531        return result;
1532    }
1533
1534    private void removeProducerBrokerExchange(ProducerId id) {
1535        synchronized (producerExchanges) {
1536            producerExchanges.remove(id);
1537        }
1538    }
1539
1540    private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1541        ConsumerBrokerExchange result = consumerExchanges.get(id);
1542        return result;
1543    }
1544
1545    private ConsumerBrokerExchange addConsumerBrokerExchange(TransportConnectionState connectionState, ConsumerId id) {
1546        ConsumerBrokerExchange result = consumerExchanges.get(id);
1547        if (result == null) {
1548            synchronized (consumerExchanges) {
1549                result = new ConsumerBrokerExchange();
1550                context = connectionState.getContext();
1551                result.setConnectionContext(context);
1552                SessionState ss = connectionState.getSessionState(id.getParentId());
1553                if (ss != null) {
1554                    ConsumerState cs = ss.getConsumerState(id);
1555                    if (cs != null) {
1556                        ConsumerInfo info = cs.getInfo();
1557                        if (info != null) {
1558                            if (info.getDestination() != null && info.getDestination().isPattern()) {
1559                                result.setWildcard(true);
1560                            }
1561                        }
1562                    }
1563                }
1564                consumerExchanges.put(id, result);
1565            }
1566        }
1567        return result;
1568    }
1569
1570    private void removeConsumerBrokerExchange(ConsumerId id) {
1571        synchronized (consumerExchanges) {
1572            consumerExchanges.remove(id);
1573        }
1574    }
1575
1576    public int getProtocolVersion() {
1577        return protocolVersion.get();
1578    }
1579
1580    @Override
1581    public Response processControlCommand(ControlCommand command) throws Exception {
1582        return null;
1583    }
1584
1585    @Override
1586    public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1587        return null;
1588    }
1589
1590    @Override
1591    public Response processConnectionControl(ConnectionControl control) throws Exception {
1592        if (control != null) {
1593            faultTolerantConnection = control.isFaultTolerant();
1594        }
1595        return null;
1596    }
1597
1598    @Override
1599    public Response processConnectionError(ConnectionError error) throws Exception {
1600        return null;
1601    }
1602
1603    @Override
1604    public Response processConsumerControl(ConsumerControl control) throws Exception {
1605        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1606        broker.processConsumerControl(consumerExchange, control);
1607        return null;
1608    }
1609
1610    protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1611                                                                            TransportConnectionState state) {
1612        TransportConnectionState cs = null;
1613        if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1614            // swap implementations
1615            TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1616            newRegister.intialize(connectionStateRegister);
1617            connectionStateRegister = newRegister;
1618        }
1619        cs = connectionStateRegister.registerConnectionState(connectionId, state);
1620        return cs;
1621    }
1622
1623    protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1624        return connectionStateRegister.unregisterConnectionState(connectionId);
1625    }
1626
1627    protected synchronized List<TransportConnectionState> listConnectionStates() {
1628        return connectionStateRegister.listConnectionStates();
1629    }
1630
1631    protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1632        return connectionStateRegister.lookupConnectionState(connectionId);
1633    }
1634
1635    protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1636        return connectionStateRegister.lookupConnectionState(id);
1637    }
1638
1639    protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1640        return connectionStateRegister.lookupConnectionState(id);
1641    }
1642
1643    protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1644        return connectionStateRegister.lookupConnectionState(id);
1645    }
1646
1647    // public only for testing
1648    public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1649        return connectionStateRegister.lookupConnectionState(connectionId);
1650    }
1651
1652    protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1653        this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1654    }
1655
1656    protected synchronized String getDuplexNetworkConnectorId() {
1657        return this.duplexNetworkConnectorId;
1658    }
1659
1660    public boolean isStopping() {
1661        return stopping.get();
1662    }
1663
1664    protected CountDownLatch getStopped() {
1665        return stopped;
1666    }
1667
1668    private int getProducerCount(ConnectionId connectionId) {
1669        int result = 0;
1670        TransportConnectionState cs = lookupConnectionState(connectionId);
1671        if (cs != null) {
1672            for (SessionId sessionId : cs.getSessionIds()) {
1673                SessionState sessionState = cs.getSessionState(sessionId);
1674                if (sessionState != null) {
1675                    result += sessionState.getProducerIds().size();
1676                }
1677            }
1678        }
1679        return result;
1680    }
1681
1682    private int getConsumerCount(ConnectionId connectionId) {
1683        int result = 0;
1684        TransportConnectionState cs = lookupConnectionState(connectionId);
1685        if (cs != null) {
1686            for (SessionId sessionId : cs.getSessionIds()) {
1687                SessionState sessionState = cs.getSessionState(sessionId);
1688                if (sessionState != null) {
1689                    result += sessionState.getConsumerIds().size();
1690                }
1691            }
1692        }
1693        return result;
1694    }
1695
1696    public WireFormatInfo getRemoteWireFormatInfo() {
1697        return wireFormatInfo;
1698    }
1699}