001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.EOFException;
020import java.io.IOException;
021import java.net.SocketException;
022import java.net.URI;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Properties;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicReference;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038
039import javax.transaction.xa.XAResource;
040
041import org.apache.activemq.advisory.AdvisorySupport;
042import org.apache.activemq.broker.region.ConnectionStatistics;
043import org.apache.activemq.broker.region.RegionBroker;
044import org.apache.activemq.command.ActiveMQDestination;
045import org.apache.activemq.command.BrokerInfo;
046import org.apache.activemq.command.Command;
047import org.apache.activemq.command.CommandTypes;
048import org.apache.activemq.command.ConnectionControl;
049import org.apache.activemq.command.ConnectionError;
050import org.apache.activemq.command.ConnectionId;
051import org.apache.activemq.command.ConnectionInfo;
052import org.apache.activemq.command.ConsumerControl;
053import org.apache.activemq.command.ConsumerId;
054import org.apache.activemq.command.ConsumerInfo;
055import org.apache.activemq.command.ControlCommand;
056import org.apache.activemq.command.DataArrayResponse;
057import org.apache.activemq.command.DestinationInfo;
058import org.apache.activemq.command.ExceptionResponse;
059import org.apache.activemq.command.FlushCommand;
060import org.apache.activemq.command.IntegerResponse;
061import org.apache.activemq.command.KeepAliveInfo;
062import org.apache.activemq.command.Message;
063import org.apache.activemq.command.MessageAck;
064import org.apache.activemq.command.MessageDispatch;
065import org.apache.activemq.command.MessageDispatchNotification;
066import org.apache.activemq.command.MessagePull;
067import org.apache.activemq.command.ProducerAck;
068import org.apache.activemq.command.ProducerId;
069import org.apache.activemq.command.ProducerInfo;
070import org.apache.activemq.command.RemoveInfo;
071import org.apache.activemq.command.RemoveSubscriptionInfo;
072import org.apache.activemq.command.Response;
073import org.apache.activemq.command.SessionId;
074import org.apache.activemq.command.SessionInfo;
075import org.apache.activemq.command.ShutdownInfo;
076import org.apache.activemq.command.TransactionId;
077import org.apache.activemq.command.TransactionInfo;
078import org.apache.activemq.command.WireFormatInfo;
079import org.apache.activemq.network.DemandForwardingBridge;
080import org.apache.activemq.network.MBeanNetworkListener;
081import org.apache.activemq.network.NetworkBridgeConfiguration;
082import org.apache.activemq.network.NetworkBridgeFactory;
083import org.apache.activemq.security.MessageAuthorizationPolicy;
084import org.apache.activemq.state.CommandVisitor;
085import org.apache.activemq.state.ConnectionState;
086import org.apache.activemq.state.ConsumerState;
087import org.apache.activemq.state.ProducerState;
088import org.apache.activemq.state.SessionState;
089import org.apache.activemq.state.TransactionState;
090import org.apache.activemq.thread.Task;
091import org.apache.activemq.thread.TaskRunner;
092import org.apache.activemq.thread.TaskRunnerFactory;
093import org.apache.activemq.transaction.Transaction;
094import org.apache.activemq.transport.DefaultTransportListener;
095import org.apache.activemq.transport.ResponseCorrelator;
096import org.apache.activemq.transport.TransmitCallback;
097import org.apache.activemq.transport.Transport;
098import org.apache.activemq.transport.TransportDisposedIOException;
099import org.apache.activemq.util.IntrospectionSupport;
100import org.apache.activemq.util.MarshallingSupport;
101import org.slf4j.Logger;
102import org.slf4j.LoggerFactory;
103import org.slf4j.MDC;
104
105public class TransportConnection implements Connection, Task, CommandVisitor {
106    private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
107    private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
108    private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
109    // Keeps track of the broker and connector that created this connection.
110    protected final Broker broker;
111    protected final BrokerService brokerService;
112    protected final TransportConnector connector;
113    // Keeps track of the state of the connections.
114    // protected final ConcurrentHashMap localConnectionStates=new
115    // ConcurrentHashMap();
116    protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
117    // The broker and wireformat info that was exchanged.
118    protected BrokerInfo brokerInfo;
119    protected final List<Command> dispatchQueue = new LinkedList<Command>();
120    protected TaskRunner taskRunner;
121    protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>();
122    protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
123    private final Transport transport;
124    private MessageAuthorizationPolicy messageAuthorizationPolicy;
125    private WireFormatInfo wireFormatInfo;
126    // Used to do async dispatch.. this should perhaps be pushed down into the
127    // transport layer..
128    private boolean inServiceException;
129    private final ConnectionStatistics statistics = new ConnectionStatistics();
130    private boolean manageable;
131    private boolean slow;
132    private boolean markedCandidate;
133    private boolean blockedCandidate;
134    private boolean blocked;
135    private boolean connected;
136    private boolean active;
137    private final AtomicBoolean starting = new AtomicBoolean();
138    private final AtomicBoolean pendingStop = new AtomicBoolean();
139    private long timeStamp;
140    private final AtomicBoolean stopping = new AtomicBoolean(false);
141    private final CountDownLatch stopped = new CountDownLatch(1);
142    private final AtomicBoolean asyncException = new AtomicBoolean(false);
143    private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
144    private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
145    private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
146    private ConnectionContext context;
147    private boolean networkConnection;
148    private boolean faultTolerantConnection;
149    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
150    private DemandForwardingBridge duplexBridge;
151    private final TaskRunnerFactory taskRunnerFactory;
152    private final TaskRunnerFactory stopTaskRunnerFactory;
153    private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
154    private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
155    private String duplexNetworkConnectorId;
156
157    /**
158     * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
159     *                          else commands are sent async.
160     * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection.
161     */
162    public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
163                               TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) {
164        this.connector = connector;
165        this.broker = broker;
166        this.brokerService = broker.getBrokerService();
167
168        RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
169        brokerConnectionStates = rb.getConnectionStates();
170        if (connector != null) {
171            this.statistics.setParent(connector.getStatistics());
172            this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
173        }
174        this.taskRunnerFactory = taskRunnerFactory;
175        this.stopTaskRunnerFactory = stopTaskRunnerFactory;
176        this.transport = transport;
177        if( this.transport instanceof BrokerServiceAware ) {
178            ((BrokerServiceAware)this.transport).setBrokerService(brokerService);
179        }
180        this.transport.setTransportListener(new DefaultTransportListener() {
181            @Override
182            public void onCommand(Object o) {
183                serviceLock.readLock().lock();
184                try {
185                    if (!(o instanceof Command)) {
186                        throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
187                    }
188                    Command command = (Command) o;
189                    if (!brokerService.isStopping()) {
190                        Response response = service(command);
191                        if (response != null && !brokerService.isStopping()) {
192                            dispatchSync(response);
193                        }
194                    } else {
195                        throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
196                    }
197                } finally {
198                    serviceLock.readLock().unlock();
199                }
200            }
201
202            @Override
203            public void onException(IOException exception) {
204                serviceLock.readLock().lock();
205                try {
206                    serviceTransportException(exception);
207                } finally {
208                    serviceLock.readLock().unlock();
209                }
210            }
211        });
212        connected = true;
213    }
214
215    /**
216     * Returns the number of messages to be dispatched to this connection
217     *
218     * @return size of dispatch queue
219     */
220    @Override
221    public int getDispatchQueueSize() {
222        synchronized (dispatchQueue) {
223            return dispatchQueue.size();
224        }
225    }
226
227    public void serviceTransportException(IOException e) {
228        if (!stopping.get() && !pendingStop.get()) {
229            transportException.set(e);
230            if (TRANSPORTLOG.isDebugEnabled()) {
231                TRANSPORTLOG.debug(this + " failed: " + e, e);
232            } else if (TRANSPORTLOG.isWarnEnabled() && !suppressed(e)) {
233                TRANSPORTLOG.warn(this + " failed: " + e);
234            }
235            stopAsync(e);
236        }
237    }
238
239    private boolean suppressed(IOException e) {
240        return !connector.isWarnOnRemoteClose() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
241    }
242
243    /**
244     * Calls the serviceException method in an async thread. Since handling a
245     * service exception closes a socket, we should not tie up broker threads
246     * since client sockets may hang or cause deadlocks.
247     */
248    @Override
249    public void serviceExceptionAsync(final IOException e) {
250        if (asyncException.compareAndSet(false, true)) {
251            new Thread("Async Exception Handler") {
252                @Override
253                public void run() {
254                    serviceException(e);
255                }
256            }.start();
257        }
258    }
259
260    /**
261     * Closes a clients connection due to a detected error. Errors are ignored
262     * if: the client is closing or broker is closing. Otherwise, the connection
263     * error transmitted to the client before stopping it's transport.
264     */
265    @Override
266    public void serviceException(Throwable e) {
267        // are we a transport exception such as not being able to dispatch
268        // synchronously to a transport
269        if (e instanceof IOException) {
270            serviceTransportException((IOException) e);
271        } else if (e.getClass() == BrokerStoppedException.class) {
272            // Handle the case where the broker is stopped
273            // But the client is still connected.
274            if (!stopping.get()) {
275                SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
276                ConnectionError ce = new ConnectionError();
277                ce.setException(e);
278                dispatchSync(ce);
279                // Record the error that caused the transport to stop
280                transportException.set(e);
281                // Wait a little bit to try to get the output buffer to flush
282                // the exception notification to the client.
283                try {
284                    Thread.sleep(500);
285                } catch (InterruptedException ie) {
286                    Thread.currentThread().interrupt();
287                }
288                // Worst case is we just kill the connection before the
289                // notification gets to him.
290                stopAsync();
291            }
292        } else if (!stopping.get() && !inServiceException) {
293            inServiceException = true;
294            try {
295                if (SERVICELOG.isDebugEnabled()) {
296                    SERVICELOG.debug("Async error occurred: " + e, e);
297                } else {
298                    SERVICELOG.warn("Async error occurred: " + e);
299                }
300                ConnectionError ce = new ConnectionError();
301                ce.setException(e);
302                if (pendingStop.get()) {
303                    dispatchSync(ce);
304                } else {
305                    dispatchAsync(ce);
306                }
307            } finally {
308                inServiceException = false;
309            }
310        }
311    }
312
313    @Override
314    public Response service(Command command) {
315        MDC.put("activemq.connector", connector.getUri().toString());
316        Response response = null;
317        boolean responseRequired = command.isResponseRequired();
318        int commandId = command.getCommandId();
319        try {
320            if (!pendingStop.get()) {
321                response = command.visit(this);
322            } else {
323                response = new ExceptionResponse(transportException.get());
324            }
325        } catch (Throwable e) {
326            if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
327                SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
328                        + " command: " + command + ", exception: " + e, e);
329            }
330
331            if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) {
332                LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause());
333                responseRequired = false;
334            }
335
336            if (responseRequired) {
337                if (e instanceof SecurityException || e.getCause() instanceof SecurityException) {
338                    SERVICELOG.warn("Security Error occurred on connection to: {}, {}",
339                            transport.getRemoteAddress(), e.getMessage());
340                }
341                response = new ExceptionResponse(e);
342            } else {
343                forceRollbackOnlyOnFailedAsyncTransactionOp(e, command);
344                serviceException(e);
345            }
346        }
347        if (responseRequired) {
348            if (response == null) {
349                response = new Response();
350            }
351            response.setCorrelationId(commandId);
352        }
353        // The context may have been flagged so that the response is not
354        // sent.
355        if (context != null) {
356            if (context.isDontSendReponse()) {
357                context.setDontSendReponse(false);
358                response = null;
359            }
360            context = null;
361        }
362        MDC.remove("activemq.connector");
363        return response;
364    }
365
366    private void forceRollbackOnlyOnFailedAsyncTransactionOp(Throwable e, Command command) {
367        if (brokerService.isRollbackOnlyOnAsyncException() && !(e instanceof IOException) && isInTransaction(command)) {
368            Transaction transaction = getActiveTransaction(command);
369            if (transaction != null && !transaction.isRollbackOnly()) {
370                LOG.debug("on async exception, force rollback of transaction for: " + command, e);
371                transaction.setRollbackOnly(e);
372            }
373        }
374    }
375
376    private Transaction getActiveTransaction(Command command) {
377        Transaction transaction = null;
378        try {
379            if (command instanceof Message) {
380                Message messageSend = (Message) command;
381                ProducerId producerId = messageSend.getProducerId();
382                ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
383                transaction = producerExchange.getConnectionContext().getTransactions().get(messageSend.getTransactionId());
384            } else if (command instanceof  MessageAck) {
385                MessageAck messageAck = (MessageAck) command;
386                ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(messageAck.getConsumerId());
387                if (consumerExchange != null) {
388                    transaction = consumerExchange.getConnectionContext().getTransactions().get(messageAck.getTransactionId());
389                }
390            }
391        } catch(Exception ignored){
392            LOG.trace("failed to find active transaction for command: " + command, ignored);
393        }
394        return transaction;
395    }
396
397    private boolean isInTransaction(Command command) {
398        return command instanceof Message && ((Message)command).isInTransaction()
399                || command instanceof MessageAck && ((MessageAck)command).isInTransaction();
400    }
401
402    @Override
403    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
404        return null;
405    }
406
407    @Override
408    public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
409        broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
410        return null;
411    }
412
413    @Override
414    public Response processWireFormat(WireFormatInfo info) throws Exception {
415        wireFormatInfo = info;
416        protocolVersion.set(info.getVersion());
417        return null;
418    }
419
420    @Override
421    public Response processShutdown(ShutdownInfo info) throws Exception {
422        stopAsync();
423        return null;
424    }
425
426    @Override
427    public Response processFlush(FlushCommand command) throws Exception {
428        return null;
429    }
430
431    @Override
432    public Response processBeginTransaction(TransactionInfo info) throws Exception {
433        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
434        context = null;
435        if (cs != null) {
436            context = cs.getContext();
437        }
438        if (cs == null) {
439            throw new NullPointerException("Context is null");
440        }
441        // Avoid replaying dup commands
442        if (cs.getTransactionState(info.getTransactionId()) == null) {
443            cs.addTransactionState(info.getTransactionId());
444            broker.beginTransaction(context, info.getTransactionId());
445        }
446        return null;
447    }
448
449    @Override
450    public int getActiveTransactionCount() {
451        int rc = 0;
452        for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
453            Collection<TransactionState> transactions = cs.getTransactionStates();
454            for (TransactionState transaction : transactions) {
455                rc++;
456            }
457        }
458        return rc;
459    }
460
461    @Override
462    public Long getOldestActiveTransactionDuration() {
463        TransactionState oldestTX = null;
464        for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
465            Collection<TransactionState> transactions = cs.getTransactionStates();
466            for (TransactionState transaction : transactions) {
467                if( oldestTX ==null || oldestTX.getCreatedAt() < transaction.getCreatedAt() ) {
468                    oldestTX = transaction;
469                }
470            }
471        }
472        if( oldestTX == null ) {
473            return null;
474        }
475        return System.currentTimeMillis() - oldestTX.getCreatedAt();
476    }
477
478    @Override
479    public Response processEndTransaction(TransactionInfo info) throws Exception {
480        // No need to do anything. This packet is just sent by the client
481        // make sure he is synced with the server as commit command could
482        // come from a different connection.
483        return null;
484    }
485
486    @Override
487    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
488        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
489        context = null;
490        if (cs != null) {
491            context = cs.getContext();
492        }
493        if (cs == null) {
494            throw new NullPointerException("Context is null");
495        }
496        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
497        if (transactionState == null) {
498            throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
499                    + info.getTransactionId());
500        }
501        // Avoid dups.
502        if (!transactionState.isPrepared()) {
503            transactionState.setPrepared(true);
504            int result = broker.prepareTransaction(context, info.getTransactionId());
505            transactionState.setPreparedResult(result);
506            if (result == XAResource.XA_RDONLY) {
507                // we are done, no further rollback or commit from TM
508                cs.removeTransactionState(info.getTransactionId());
509            }
510            IntegerResponse response = new IntegerResponse(result);
511            return response;
512        } else {
513            IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
514            return response;
515        }
516    }
517
518    @Override
519    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
520        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
521        context = cs.getContext();
522        cs.removeTransactionState(info.getTransactionId());
523        broker.commitTransaction(context, info.getTransactionId(), true);
524        return null;
525    }
526
527    @Override
528    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
529        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
530        context = cs.getContext();
531        cs.removeTransactionState(info.getTransactionId());
532        broker.commitTransaction(context, info.getTransactionId(), false);
533        return null;
534    }
535
536    @Override
537    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
538        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
539        context = cs.getContext();
540        cs.removeTransactionState(info.getTransactionId());
541        broker.rollbackTransaction(context, info.getTransactionId());
542        return null;
543    }
544
545    @Override
546    public Response processForgetTransaction(TransactionInfo info) throws Exception {
547        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
548        context = cs.getContext();
549        broker.forgetTransaction(context, info.getTransactionId());
550        return null;
551    }
552
553    @Override
554    public Response processRecoverTransactions(TransactionInfo info) throws Exception {
555        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
556        context = cs.getContext();
557        TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
558        return new DataArrayResponse(preparedTransactions);
559    }
560
561    @Override
562    public Response processMessage(Message messageSend) throws Exception {
563        ProducerId producerId = messageSend.getProducerId();
564        ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
565        if (producerExchange.canDispatch(messageSend)) {
566            broker.send(producerExchange, messageSend);
567        }
568        return null;
569    }
570
571    @Override
572    public Response processMessageAck(MessageAck ack) throws Exception {
573        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
574        if (consumerExchange != null) {
575            broker.acknowledge(consumerExchange, ack);
576        } else if (ack.isInTransaction()) {
577            LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack);
578        }
579        return null;
580    }
581
582    @Override
583    public Response processMessagePull(MessagePull pull) throws Exception {
584        return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
585    }
586
587    @Override
588    public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
589        broker.processDispatchNotification(notification);
590        return null;
591    }
592
593    @Override
594    public Response processAddDestination(DestinationInfo info) throws Exception {
595        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
596        broker.addDestinationInfo(cs.getContext(), info);
597        if (info.getDestination().isTemporary()) {
598            cs.addTempDestination(info);
599        }
600        return null;
601    }
602
603    @Override
604    public Response processRemoveDestination(DestinationInfo info) throws Exception {
605        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
606        broker.removeDestinationInfo(cs.getContext(), info);
607        if (info.getDestination().isTemporary()) {
608            cs.removeTempDestination(info.getDestination());
609        }
610        return null;
611    }
612
613    @Override
614    public Response processAddProducer(ProducerInfo info) throws Exception {
615        SessionId sessionId = info.getProducerId().getParentId();
616        ConnectionId connectionId = sessionId.getParentId();
617        TransportConnectionState cs = lookupConnectionState(connectionId);
618        if (cs == null) {
619            throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
620                    + connectionId);
621        }
622        SessionState ss = cs.getSessionState(sessionId);
623        if (ss == null) {
624            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
625                    + sessionId);
626        }
627        // Avoid replaying dup commands
628        if (!ss.getProducerIds().contains(info.getProducerId())) {
629            ActiveMQDestination destination = info.getDestination();
630            // Do not check for null here as it would cause the count of max producers to exclude
631            // anonymous producers.  The isAdvisoryTopic method checks for null so it is safe to
632            // call it from here with a null Destination value.
633            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
634                if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
635                    throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
636                }
637            }
638            broker.addProducer(cs.getContext(), info);
639            try {
640                ss.addProducer(info);
641            } catch (IllegalStateException e) {
642                broker.removeProducer(cs.getContext(), info);
643            }
644
645        }
646        return null;
647    }
648
649    @Override
650    public Response processRemoveProducer(ProducerId id) throws Exception {
651        SessionId sessionId = id.getParentId();
652        ConnectionId connectionId = sessionId.getParentId();
653        TransportConnectionState cs = lookupConnectionState(connectionId);
654        SessionState ss = cs.getSessionState(sessionId);
655        if (ss == null) {
656            throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
657                    + sessionId);
658        }
659        ProducerState ps = ss.removeProducer(id);
660        if (ps == null) {
661            throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
662        }
663        removeProducerBrokerExchange(id);
664        broker.removeProducer(cs.getContext(), ps.getInfo());
665        return null;
666    }
667
668    @Override
669    public Response processAddConsumer(ConsumerInfo info) throws Exception {
670        SessionId sessionId = info.getConsumerId().getParentId();
671        ConnectionId connectionId = sessionId.getParentId();
672        TransportConnectionState cs = lookupConnectionState(connectionId);
673        if (cs == null) {
674            throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: "
675                    + connectionId);
676        }
677        SessionState ss = cs.getSessionState(sessionId);
678        if (ss == null) {
679            throw new IllegalStateException(broker.getBrokerName()
680                    + " Cannot add a consumer to a session that had not been registered: " + sessionId);
681        }
682        // Avoid replaying dup commands
683        if (!ss.getConsumerIds().contains(info.getConsumerId())) {
684            ActiveMQDestination destination = info.getDestination();
685            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
686                if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){
687                    throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection());
688                }
689            }
690
691            broker.addConsumer(cs.getContext(), info);
692            try {
693                ss.addConsumer(info);
694                addConsumerBrokerExchange(cs, info.getConsumerId());
695            } catch (IllegalStateException e) {
696                broker.removeConsumer(cs.getContext(), info);
697            }
698
699        }
700        return null;
701    }
702
703    @Override
704    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
705        SessionId sessionId = id.getParentId();
706        ConnectionId connectionId = sessionId.getParentId();
707        TransportConnectionState cs = lookupConnectionState(connectionId);
708        if (cs == null) {
709            throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
710                    + connectionId);
711        }
712        SessionState ss = cs.getSessionState(sessionId);
713        if (ss == null) {
714            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
715                    + sessionId);
716        }
717        ConsumerState consumerState = ss.removeConsumer(id);
718        if (consumerState == null) {
719            throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
720        }
721        ConsumerInfo info = consumerState.getInfo();
722        info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
723        broker.removeConsumer(cs.getContext(), consumerState.getInfo());
724        removeConsumerBrokerExchange(id);
725        return null;
726    }
727
728    @Override
729    public Response processAddSession(SessionInfo info) throws Exception {
730        ConnectionId connectionId = info.getSessionId().getParentId();
731        TransportConnectionState cs = lookupConnectionState(connectionId);
732        // Avoid replaying dup commands
733        if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
734            broker.addSession(cs.getContext(), info);
735            try {
736                cs.addSession(info);
737            } catch (IllegalStateException e) {
738                e.printStackTrace();
739                broker.removeSession(cs.getContext(), info);
740            }
741        }
742        return null;
743    }
744
745    @Override
746    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
747        ConnectionId connectionId = id.getParentId();
748        TransportConnectionState cs = lookupConnectionState(connectionId);
749        if (cs == null) {
750            throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
751        }
752        SessionState session = cs.getSessionState(id);
753        if (session == null) {
754            throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
755        }
756        // Don't let new consumers or producers get added while we are closing
757        // this down.
758        session.shutdown();
759        // Cascade the connection stop to the consumers and producers.
760        for (ConsumerId consumerId : session.getConsumerIds()) {
761            try {
762                processRemoveConsumer(consumerId, lastDeliveredSequenceId);
763            } catch (Throwable e) {
764                LOG.warn("Failed to remove consumer: {}", consumerId, e);
765            }
766        }
767        for (ProducerId producerId : session.getProducerIds()) {
768            try {
769                processRemoveProducer(producerId);
770            } catch (Throwable e) {
771                LOG.warn("Failed to remove producer: {}", producerId, e);
772            }
773        }
774        cs.removeSession(id);
775        broker.removeSession(cs.getContext(), session.getInfo());
776        return null;
777    }
778
779    @Override
780    public Response processAddConnection(ConnectionInfo info) throws Exception {
781        // Older clients should have been defaulting this field to true.. but
782        // they were not.
783        if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
784            info.setClientMaster(true);
785        }
786        TransportConnectionState state;
787        // Make sure 2 concurrent connections by the same ID only generate 1
788        // TransportConnectionState object.
789        synchronized (brokerConnectionStates) {
790            state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
791            if (state == null) {
792                state = new TransportConnectionState(info, this);
793                brokerConnectionStates.put(info.getConnectionId(), state);
794            }
795            state.incrementReference();
796        }
797        // If there are 2 concurrent connections for the same connection id,
798        // then last one in wins, we need to sync here
799        // to figure out the winner.
800        synchronized (state.getConnectionMutex()) {
801            if (state.getConnection() != this) {
802                LOG.debug("Killing previous stale connection: {}", state.getConnection().getRemoteAddress());
803                state.getConnection().stop();
804                LOG.debug("Connection {} taking over previous connection: {}", getRemoteAddress(), state.getConnection().getRemoteAddress());
805                state.setConnection(this);
806                state.reset(info);
807            }
808        }
809        registerConnectionState(info.getConnectionId(), state);
810        LOG.debug("Setting up new connection id: {}, address: {}, info: {}", new Object[]{ info.getConnectionId(), getRemoteAddress(), info });
811        this.faultTolerantConnection = info.isFaultTolerant();
812        // Setup the context.
813        String clientId = info.getClientId();
814        context = new ConnectionContext();
815        context.setBroker(broker);
816        context.setClientId(clientId);
817        context.setClientMaster(info.isClientMaster());
818        context.setConnection(this);
819        context.setConnectionId(info.getConnectionId());
820        context.setConnector(connector);
821        context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
822        context.setNetworkConnection(networkConnection);
823        context.setFaultTolerant(faultTolerantConnection);
824        context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
825        context.setUserName(info.getUserName());
826        context.setWireFormatInfo(wireFormatInfo);
827        context.setReconnect(info.isFailoverReconnect());
828        this.manageable = info.isManageable();
829        context.setConnectionState(state);
830        state.setContext(context);
831        state.setConnection(this);
832        if (info.getClientIp() == null) {
833            info.setClientIp(getRemoteAddress());
834        }
835
836        try {
837            broker.addConnection(context, info);
838        } catch (Exception e) {
839            synchronized (brokerConnectionStates) {
840                brokerConnectionStates.remove(info.getConnectionId());
841            }
842            unregisterConnectionState(info.getConnectionId());
843            LOG.warn("Failed to add Connection {} due to {}", info.getConnectionId(), e);
844            if (e instanceof SecurityException) {
845                // close this down - in case the peer of this transport doesn't play nice
846                delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
847            }
848            throw e;
849        }
850        if (info.isManageable()) {
851            // send ConnectionCommand
852            ConnectionControl command = this.connector.getConnectionControl();
853            command.setFaultTolerant(broker.isFaultTolerantConfiguration());
854            if (info.isFailoverReconnect()) {
855                command.setRebalanceConnection(false);
856            }
857            dispatchAsync(command);
858        }
859        return null;
860    }
861
862    @Override
863    public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
864            throws InterruptedException {
865        LOG.debug("remove connection id: {}", id);
866        TransportConnectionState cs = lookupConnectionState(id);
867        if (cs != null) {
868            // Don't allow things to be added to the connection state while we
869            // are shutting down.
870            cs.shutdown();
871            // Cascade the connection stop to the sessions.
872            for (SessionId sessionId : cs.getSessionIds()) {
873                try {
874                    processRemoveSession(sessionId, lastDeliveredSequenceId);
875                } catch (Throwable e) {
876                    SERVICELOG.warn("Failed to remove session {}", sessionId, e);
877                }
878            }
879            // Cascade the connection stop to temp destinations.
880            for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
881                DestinationInfo di = iter.next();
882                try {
883                    broker.removeDestination(cs.getContext(), di.getDestination(), 0);
884                } catch (Throwable e) {
885                    SERVICELOG.warn("Failed to remove tmp destination {}", di.getDestination(), e);
886                }
887                iter.remove();
888            }
889            try {
890                broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get());
891            } catch (Throwable e) {
892                SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e);
893            }
894            TransportConnectionState state = unregisterConnectionState(id);
895            if (state != null) {
896                synchronized (brokerConnectionStates) {
897                    // If we are the last reference, we should remove the state
898                    // from the broker.
899                    if (state.decrementReference() == 0) {
900                        brokerConnectionStates.remove(id);
901                    }
902                }
903            }
904        }
905        return null;
906    }
907
908    @Override
909    public Response processProducerAck(ProducerAck ack) throws Exception {
910        // A broker should not get ProducerAck messages.
911        return null;
912    }
913
914    @Override
915    public Connector getConnector() {
916        return connector;
917    }
918
919    @Override
920    public void dispatchSync(Command message) {
921        try {
922            processDispatch(message);
923        } catch (IOException e) {
924            serviceExceptionAsync(e);
925        }
926    }
927
928    @Override
929    public void dispatchAsync(Command message) {
930        if (!stopping.get()) {
931            if (taskRunner == null) {
932                dispatchSync(message);
933            } else {
934                synchronized (dispatchQueue) {
935                    dispatchQueue.add(message);
936                }
937                try {
938                    taskRunner.wakeup();
939                } catch (InterruptedException e) {
940                    Thread.currentThread().interrupt();
941                }
942            }
943        } else {
944            if (message.isMessageDispatch()) {
945                MessageDispatch md = (MessageDispatch) message;
946                TransmitCallback sub = md.getTransmitCallback();
947                broker.postProcessDispatch(md);
948                if (sub != null) {
949                    sub.onFailure();
950                }
951            }
952        }
953    }
954
955    protected void processDispatch(Command command) throws IOException {
956        MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
957        try {
958            if (!stopping.get()) {
959                if (messageDispatch != null) {
960                    try {
961                        broker.preProcessDispatch(messageDispatch);
962                    } catch (RuntimeException convertToIO) {
963                        throw new IOException(convertToIO);
964                    }
965                }
966                dispatch(command);
967            }
968        } catch (IOException e) {
969            if (messageDispatch != null) {
970                TransmitCallback sub = messageDispatch.getTransmitCallback();
971                broker.postProcessDispatch(messageDispatch);
972                if (sub != null) {
973                    sub.onFailure();
974                }
975                messageDispatch = null;
976                throw e;
977            } else {
978                if (TRANSPORTLOG.isDebugEnabled()) {
979                    TRANSPORTLOG.debug("Unexpected exception on asyncDispatch, command of type: " + command.getDataStructureType(), e);
980                }
981            }
982        } finally {
983            if (messageDispatch != null) {
984                TransmitCallback sub = messageDispatch.getTransmitCallback();
985                broker.postProcessDispatch(messageDispatch);
986                if (sub != null) {
987                    sub.onSuccess();
988                }
989            }
990        }
991    }
992
993    @Override
994    public boolean iterate() {
995        try {
996            if (pendingStop.get() || stopping.get()) {
997                if (dispatchStopped.compareAndSet(false, true)) {
998                    if (transportException.get() == null) {
999                        try {
1000                            dispatch(new ShutdownInfo());
1001                        } catch (Throwable ignore) {
1002                        }
1003                    }
1004                    dispatchStoppedLatch.countDown();
1005                }
1006                return false;
1007            }
1008            if (!dispatchStopped.get()) {
1009                Command command = null;
1010                synchronized (dispatchQueue) {
1011                    if (dispatchQueue.isEmpty()) {
1012                        return false;
1013                    }
1014                    command = dispatchQueue.remove(0);
1015                }
1016                processDispatch(command);
1017                return true;
1018            }
1019            return false;
1020        } catch (IOException e) {
1021            if (dispatchStopped.compareAndSet(false, true)) {
1022                dispatchStoppedLatch.countDown();
1023            }
1024            serviceExceptionAsync(e);
1025            return false;
1026        }
1027    }
1028
1029    /**
1030     * Returns the statistics for this connection
1031     */
1032    @Override
1033    public ConnectionStatistics getStatistics() {
1034        return statistics;
1035    }
1036
1037    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1038        return messageAuthorizationPolicy;
1039    }
1040
1041    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1042        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1043    }
1044
1045    @Override
1046    public boolean isManageable() {
1047        return manageable;
1048    }
1049
1050    @Override
1051    public void start() throws Exception {
1052        try {
1053            synchronized (this) {
1054                starting.set(true);
1055                if (taskRunnerFactory != null) {
1056                    taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
1057                            + getRemoteAddress());
1058                } else {
1059                    taskRunner = null;
1060                }
1061                transport.start();
1062                active = true;
1063                BrokerInfo info = connector.getBrokerInfo().copy();
1064                if (connector.isUpdateClusterClients()) {
1065                    info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
1066                } else {
1067                    info.setPeerBrokerInfos(null);
1068                }
1069                dispatchAsync(info);
1070
1071                connector.onStarted(this);
1072            }
1073        } catch (Exception e) {
1074            // Force clean up on an error starting up.
1075            pendingStop.set(true);
1076            throw e;
1077        } finally {
1078            // stop() can be called from within the above block,
1079            // but we want to be sure start() completes before
1080            // stop() runs, so queue the stop until right now:
1081            setStarting(false);
1082            if (isPendingStop()) {
1083                LOG.debug("Calling the delayed stop() after start() {}", this);
1084                stop();
1085            }
1086        }
1087    }
1088
1089    @Override
1090    public void stop() throws Exception {
1091        // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory)
1092        // as their lifecycle is handled elsewhere
1093
1094        stopAsync();
1095        while (!stopped.await(5, TimeUnit.SECONDS)) {
1096            LOG.info("The connection to '{}' is taking a long time to shutdown.", transport.getRemoteAddress());
1097        }
1098    }
1099
1100    public void delayedStop(final int waitTime, final String reason, Throwable cause) {
1101        if (waitTime > 0) {
1102            synchronized (this) {
1103                pendingStop.set(true);
1104                transportException.set(cause);
1105            }
1106            try {
1107                stopTaskRunnerFactory.execute(new Runnable() {
1108                    @Override
1109                    public void run() {
1110                        try {
1111                            Thread.sleep(waitTime);
1112                            stopAsync();
1113                            LOG.info("Stopping {} because {}", transport.getRemoteAddress(), reason);
1114                        } catch (InterruptedException e) {
1115                        }
1116                    }
1117                });
1118            } catch (Throwable t) {
1119                LOG.warn("Cannot create stopAsync. This exception will be ignored.", t);
1120            }
1121        }
1122    }
1123
1124    public void stopAsync(Throwable cause) {
1125        transportException.set(cause);
1126        stopAsync();
1127    }
1128
1129    public void stopAsync() {
1130        // If we're in the middle of starting then go no further... for now.
1131        synchronized (this) {
1132            pendingStop.set(true);
1133            if (starting.get()) {
1134                LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
1135                return;
1136            }
1137        }
1138        if (stopping.compareAndSet(false, true)) {
1139            // Let all the connection contexts know we are shutting down
1140            // so that in progress operations can notice and unblock.
1141            List<TransportConnectionState> connectionStates = listConnectionStates();
1142            for (TransportConnectionState cs : connectionStates) {
1143                ConnectionContext connectionContext = cs.getContext();
1144                if (connectionContext != null) {
1145                    connectionContext.getStopping().set(true);
1146                }
1147            }
1148            try {
1149                stopTaskRunnerFactory.execute(new Runnable() {
1150                    @Override
1151                    public void run() {
1152                        serviceLock.writeLock().lock();
1153                        try {
1154                            doStop();
1155                        } catch (Throwable e) {
1156                            LOG.debug("Error occurred while shutting down a connection {}", this, e);
1157                        } finally {
1158                            stopped.countDown();
1159                            serviceLock.writeLock().unlock();
1160                        }
1161                    }
1162                });
1163            } catch (Throwable t) {
1164                LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t);
1165                stopped.countDown();
1166            }
1167        }
1168    }
1169
1170    @Override
1171    public String toString() {
1172        return "Transport Connection to: " + transport.getRemoteAddress();
1173    }
1174
1175    protected void doStop() throws Exception {
1176        LOG.debug("Stopping connection: {}", transport.getRemoteAddress());
1177        connector.onStopped(this);
1178        try {
1179            synchronized (this) {
1180                if (duplexBridge != null) {
1181                    duplexBridge.stop();
1182                }
1183            }
1184        } catch (Exception ignore) {
1185            LOG.trace("Exception caught stopping. This exception is ignored.", ignore);
1186        }
1187        try {
1188            transport.stop();
1189            LOG.debug("Stopped transport: {}", transport.getRemoteAddress());
1190        } catch (Exception e) {
1191            LOG.debug("Could not stop transport to {}. This exception is ignored.", transport.getRemoteAddress(), e);
1192        }
1193        if (taskRunner != null) {
1194            taskRunner.shutdown(1);
1195            taskRunner = null;
1196        }
1197        active = false;
1198        // Run the MessageDispatch callbacks so that message references get
1199        // cleaned up.
1200        synchronized (dispatchQueue) {
1201            for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
1202                Command command = iter.next();
1203                if (command.isMessageDispatch()) {
1204                    MessageDispatch md = (MessageDispatch) command;
1205                    TransmitCallback sub = md.getTransmitCallback();
1206                    broker.postProcessDispatch(md);
1207                    if (sub != null) {
1208                        sub.onFailure();
1209                    }
1210                }
1211            }
1212            dispatchQueue.clear();
1213        }
1214        //
1215        // Remove all logical connection associated with this connection
1216        // from the broker.
1217        if (!broker.isStopped()) {
1218            List<TransportConnectionState> connectionStates = listConnectionStates();
1219            connectionStates = listConnectionStates();
1220            for (TransportConnectionState cs : connectionStates) {
1221                cs.getContext().getStopping().set(true);
1222                try {
1223                    LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
1224                    processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
1225                } catch (Throwable ignore) {
1226                    ignore.printStackTrace();
1227                }
1228            }
1229        }
1230        LOG.debug("Connection Stopped: {}", getRemoteAddress());
1231    }
1232
1233    /**
1234     * @return Returns the blockedCandidate.
1235     */
1236    public boolean isBlockedCandidate() {
1237        return blockedCandidate;
1238    }
1239
1240    /**
1241     * @param blockedCandidate The blockedCandidate to set.
1242     */
1243    public void setBlockedCandidate(boolean blockedCandidate) {
1244        this.blockedCandidate = blockedCandidate;
1245    }
1246
1247    /**
1248     * @return Returns the markedCandidate.
1249     */
1250    public boolean isMarkedCandidate() {
1251        return markedCandidate;
1252    }
1253
1254    /**
1255     * @param markedCandidate The markedCandidate to set.
1256     */
1257    public void setMarkedCandidate(boolean markedCandidate) {
1258        this.markedCandidate = markedCandidate;
1259        if (!markedCandidate) {
1260            timeStamp = 0;
1261            blockedCandidate = false;
1262        }
1263    }
1264
1265    /**
1266     * @param slow The slow to set.
1267     */
1268    public void setSlow(boolean slow) {
1269        this.slow = slow;
1270    }
1271
1272    /**
1273     * @return true if the Connection is slow
1274     */
1275    @Override
1276    public boolean isSlow() {
1277        return slow;
1278    }
1279
1280    /**
1281     * @return true if the Connection is potentially blocked
1282     */
1283    public boolean isMarkedBlockedCandidate() {
1284        return markedCandidate;
1285    }
1286
1287    /**
1288     * Mark the Connection, so we can deem if it's collectable on the next sweep
1289     */
1290    public void doMark() {
1291        if (timeStamp == 0) {
1292            timeStamp = System.currentTimeMillis();
1293        }
1294    }
1295
1296    /**
1297     * @return if after being marked, the Connection is still writing
1298     */
1299    @Override
1300    public boolean isBlocked() {
1301        return blocked;
1302    }
1303
1304    /**
1305     * @return true if the Connection is connected
1306     */
1307    @Override
1308    public boolean isConnected() {
1309        return connected;
1310    }
1311
1312    /**
1313     * @param blocked The blocked to set.
1314     */
1315    public void setBlocked(boolean blocked) {
1316        this.blocked = blocked;
1317    }
1318
1319    /**
1320     * @param connected The connected to set.
1321     */
1322    public void setConnected(boolean connected) {
1323        this.connected = connected;
1324    }
1325
1326    /**
1327     * @return true if the Connection is active
1328     */
1329    @Override
1330    public boolean isActive() {
1331        return active;
1332    }
1333
1334    /**
1335     * @param active The active to set.
1336     */
1337    public void setActive(boolean active) {
1338        this.active = active;
1339    }
1340
1341    /**
1342     * @return true if the Connection is starting
1343     */
1344    public boolean isStarting() {
1345        return starting.get();
1346    }
1347
1348    @Override
1349    public synchronized boolean isNetworkConnection() {
1350        return networkConnection;
1351    }
1352
1353    @Override
1354    public boolean isFaultTolerantConnection() {
1355        return this.faultTolerantConnection;
1356    }
1357
1358    protected void setStarting(boolean starting) {
1359        this.starting.set(starting);
1360    }
1361
1362    /**
1363     * @return true if the Connection needs to stop
1364     */
1365    public boolean isPendingStop() {
1366        return pendingStop.get();
1367    }
1368
1369    protected void setPendingStop(boolean pendingStop) {
1370        this.pendingStop.set(pendingStop);
1371    }
1372
1373    @Override
1374    public Response processBrokerInfo(BrokerInfo info) {
1375        if (info.isSlaveBroker()) {
1376            LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
1377        } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1378            // so this TransportConnection is the rear end of a network bridge
1379            // We have been requested to create a two way pipe ...
1380            try {
1381                Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1382                Map<String, String> props = createMap(properties);
1383                NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1384                IntrospectionSupport.setProperties(config, props, "");
1385                config.setBrokerName(broker.getBrokerName());
1386
1387                // check for existing duplex connection hanging about
1388
1389                // We first look if existing network connection already exists for the same broker Id and network connector name
1390                // It's possible in case of brief network fault to have this transport connector side of the connection always active
1391                // and the duplex network connector side wanting to open a new one
1392                // In this case, the old connection must be broken
1393                String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1394                CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1395                synchronized (connections) {
1396                    for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
1397                        TransportConnection c = iter.next();
1398                        if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1399                            LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId);
1400                            c.stopAsync();
1401                            // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1402                            c.getStopped().await(1, TimeUnit.SECONDS);
1403                        }
1404                    }
1405                    setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1406                }
1407                Transport localTransport = NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI());
1408                Transport remoteBridgeTransport = transport;
1409                if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
1410                    // the vm transport case is already wrapped
1411                    remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport);
1412                }
1413                String duplexName = localTransport.toString();
1414                if (duplexName.contains("#")) {
1415                    duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1416                }
1417                MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName));
1418                listener.setCreatedByDuplex(true);
1419                duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1420                duplexBridge.setBrokerService(brokerService);
1421                // now turn duplex off this side
1422                info.setDuplexConnection(false);
1423                duplexBridge.setCreatedByDuplex(true);
1424                duplexBridge.duplexStart(this, brokerInfo, info);
1425                LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId);
1426                return null;
1427            } catch (TransportDisposedIOException e) {
1428                LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId);
1429                return null;
1430            } catch (Exception e) {
1431                LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e);
1432                return null;
1433            }
1434        }
1435        // We only expect to get one broker info command per connection
1436        if (this.brokerInfo != null) {
1437            LOG.warn("Unexpected extra broker info command received: {}", info);
1438        }
1439        this.brokerInfo = info;
1440        networkConnection = true;
1441        List<TransportConnectionState> connectionStates = listConnectionStates();
1442        for (TransportConnectionState cs : connectionStates) {
1443            cs.getContext().setNetworkConnection(true);
1444        }
1445        return null;
1446    }
1447
1448    @SuppressWarnings({"unchecked", "rawtypes"})
1449    private HashMap<String, String> createMap(Properties properties) {
1450        return new HashMap(properties);
1451    }
1452
1453    protected void dispatch(Command command) throws IOException {
1454        try {
1455            setMarkedCandidate(true);
1456            transport.oneway(command);
1457        } finally {
1458            setMarkedCandidate(false);
1459        }
1460    }
1461
1462    @Override
1463    public String getRemoteAddress() {
1464        return transport.getRemoteAddress();
1465    }
1466
1467    public Transport getTransport() {
1468        return transport;
1469    }
1470
1471    @Override
1472    public String getConnectionId() {
1473        List<TransportConnectionState> connectionStates = listConnectionStates();
1474        for (TransportConnectionState cs : connectionStates) {
1475            if (cs.getInfo().getClientId() != null) {
1476                return cs.getInfo().getClientId();
1477            }
1478            return cs.getInfo().getConnectionId().toString();
1479        }
1480        return null;
1481    }
1482
1483    @Override
1484    public void updateClient(ConnectionControl control) {
1485        if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1486                && this.wireFormatInfo.getVersion() >= 6) {
1487            dispatchAsync(control);
1488        }
1489    }
1490
1491    public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){
1492        ProducerBrokerExchange result = null;
1493        if (producerInfo != null && producerInfo.getProducerId() != null){
1494            synchronized (producerExchanges){
1495                result = producerExchanges.get(producerInfo.getProducerId());
1496            }
1497        }
1498        return result;
1499    }
1500
1501    private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1502        ProducerBrokerExchange result = producerExchanges.get(id);
1503        if (result == null) {
1504            synchronized (producerExchanges) {
1505                result = new ProducerBrokerExchange();
1506                TransportConnectionState state = lookupConnectionState(id);
1507                context = state.getContext();
1508                result.setConnectionContext(context);
1509                if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) {
1510                    result.setLastStoredSequenceId(brokerService.getPersistenceAdapter().getLastProducerSequenceId(id));
1511                }
1512                SessionState ss = state.getSessionState(id.getParentId());
1513                if (ss != null) {
1514                    result.setProducerState(ss.getProducerState(id));
1515                    ProducerState producerState = ss.getProducerState(id);
1516                    if (producerState != null && producerState.getInfo() != null) {
1517                        ProducerInfo info = producerState.getInfo();
1518                        result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1519                    }
1520                }
1521                producerExchanges.put(id, result);
1522            }
1523        } else {
1524            context = result.getConnectionContext();
1525        }
1526        return result;
1527    }
1528
1529    private void removeProducerBrokerExchange(ProducerId id) {
1530        synchronized (producerExchanges) {
1531            producerExchanges.remove(id);
1532        }
1533    }
1534
1535    private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1536        ConsumerBrokerExchange result = consumerExchanges.get(id);
1537        return result;
1538    }
1539
1540    private ConsumerBrokerExchange addConsumerBrokerExchange(TransportConnectionState connectionState, ConsumerId id) {
1541        ConsumerBrokerExchange result = consumerExchanges.get(id);
1542        if (result == null) {
1543            synchronized (consumerExchanges) {
1544                result = new ConsumerBrokerExchange();
1545                context = connectionState.getContext();
1546                result.setConnectionContext(context);
1547                SessionState ss = connectionState.getSessionState(id.getParentId());
1548                if (ss != null) {
1549                    ConsumerState cs = ss.getConsumerState(id);
1550                    if (cs != null) {
1551                        ConsumerInfo info = cs.getInfo();
1552                        if (info != null) {
1553                            if (info.getDestination() != null && info.getDestination().isPattern()) {
1554                                result.setWildcard(true);
1555                            }
1556                        }
1557                    }
1558                }
1559                consumerExchanges.put(id, result);
1560            }
1561        }
1562        return result;
1563    }
1564
1565    private void removeConsumerBrokerExchange(ConsumerId id) {
1566        synchronized (consumerExchanges) {
1567            consumerExchanges.remove(id);
1568        }
1569    }
1570
1571    public int getProtocolVersion() {
1572        return protocolVersion.get();
1573    }
1574
1575    @Override
1576    public Response processControlCommand(ControlCommand command) throws Exception {
1577        return null;
1578    }
1579
1580    @Override
1581    public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1582        return null;
1583    }
1584
1585    @Override
1586    public Response processConnectionControl(ConnectionControl control) throws Exception {
1587        if (control != null) {
1588            faultTolerantConnection = control.isFaultTolerant();
1589        }
1590        return null;
1591    }
1592
1593    @Override
1594    public Response processConnectionError(ConnectionError error) throws Exception {
1595        return null;
1596    }
1597
1598    @Override
1599    public Response processConsumerControl(ConsumerControl control) throws Exception {
1600        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1601        broker.processConsumerControl(consumerExchange, control);
1602        return null;
1603    }
1604
1605    protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1606                                                                            TransportConnectionState state) {
1607        TransportConnectionState cs = null;
1608        if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1609            // swap implementations
1610            TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1611            newRegister.intialize(connectionStateRegister);
1612            connectionStateRegister = newRegister;
1613        }
1614        cs = connectionStateRegister.registerConnectionState(connectionId, state);
1615        return cs;
1616    }
1617
1618    protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1619        return connectionStateRegister.unregisterConnectionState(connectionId);
1620    }
1621
1622    protected synchronized List<TransportConnectionState> listConnectionStates() {
1623        return connectionStateRegister.listConnectionStates();
1624    }
1625
1626    protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1627        return connectionStateRegister.lookupConnectionState(connectionId);
1628    }
1629
1630    protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1631        return connectionStateRegister.lookupConnectionState(id);
1632    }
1633
1634    protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1635        return connectionStateRegister.lookupConnectionState(id);
1636    }
1637
1638    protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1639        return connectionStateRegister.lookupConnectionState(id);
1640    }
1641
1642    // public only for testing
1643    public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1644        return connectionStateRegister.lookupConnectionState(connectionId);
1645    }
1646
1647    protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1648        this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1649    }
1650
1651    protected synchronized String getDuplexNetworkConnectorId() {
1652        return this.duplexNetworkConnectorId;
1653    }
1654
1655    public boolean isStopping() {
1656        return stopping.get();
1657    }
1658
1659    protected CountDownLatch getStopped() {
1660        return stopped;
1661    }
1662
1663    private int getProducerCount(ConnectionId connectionId) {
1664        int result = 0;
1665        TransportConnectionState cs = lookupConnectionState(connectionId);
1666        if (cs != null) {
1667            for (SessionId sessionId : cs.getSessionIds()) {
1668                SessionState sessionState = cs.getSessionState(sessionId);
1669                if (sessionState != null) {
1670                    result += sessionState.getProducerIds().size();
1671                }
1672            }
1673        }
1674        return result;
1675    }
1676
1677    private int getConsumerCount(ConnectionId connectionId) {
1678        int result = 0;
1679        TransportConnectionState cs = lookupConnectionState(connectionId);
1680        if (cs != null) {
1681            for (SessionId sessionId : cs.getSessionIds()) {
1682                SessionState sessionState = cs.getSessionState(sessionId);
1683                if (sessionState != null) {
1684                    result += sessionState.getConsumerIds().size();
1685                }
1686            }
1687        }
1688        return result;
1689    }
1690
1691    public WireFormatInfo getRemoteWireFormatInfo() {
1692        return wireFormatInfo;
1693    }
1694}