001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import static org.apache.activemq.transport.amqp.AmqpSupport.ANONYMOUS_RELAY;
020import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
021import static org.apache.activemq.transport.amqp.AmqpSupport.CONTAINER_ID;
022import static org.apache.activemq.transport.amqp.AmqpSupport.INVALID_FIELD;
023import static org.apache.activemq.transport.amqp.AmqpSupport.PLATFORM;
024import static org.apache.activemq.transport.amqp.AmqpSupport.PRODUCT;
025import static org.apache.activemq.transport.amqp.AmqpSupport.QUEUE_PREFIX;
026import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
027import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
028import static org.apache.activemq.transport.amqp.AmqpSupport.TOPIC_PREFIX;
029import static org.apache.activemq.transport.amqp.AmqpSupport.VERSION;
030import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
031
032import java.io.BufferedReader;
033import java.io.IOException;
034import java.io.InputStream;
035import java.io.InputStreamReader;
036import java.nio.ByteBuffer;
037import java.util.HashMap;
038import java.util.Map;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentMap;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicInteger;
043
044import javax.jms.InvalidClientIDException;
045
046import org.apache.activemq.broker.BrokerService;
047import org.apache.activemq.broker.region.DurableTopicSubscription;
048import org.apache.activemq.broker.region.RegionBroker;
049import org.apache.activemq.broker.region.TopicRegion;
050import org.apache.activemq.command.ActiveMQDestination;
051import org.apache.activemq.command.ActiveMQTempDestination;
052import org.apache.activemq.command.ActiveMQTempQueue;
053import org.apache.activemq.command.ActiveMQTempTopic;
054import org.apache.activemq.command.Command;
055import org.apache.activemq.command.ConnectionError;
056import org.apache.activemq.command.ConnectionId;
057import org.apache.activemq.command.ConnectionInfo;
058import org.apache.activemq.command.ConsumerControl;
059import org.apache.activemq.command.ConsumerId;
060import org.apache.activemq.command.ConsumerInfo;
061import org.apache.activemq.command.DestinationInfo;
062import org.apache.activemq.command.ExceptionResponse;
063import org.apache.activemq.command.LocalTransactionId;
064import org.apache.activemq.command.MessageDispatch;
065import org.apache.activemq.command.RemoveInfo;
066import org.apache.activemq.command.Response;
067import org.apache.activemq.command.SessionId;
068import org.apache.activemq.command.ShutdownInfo;
069import org.apache.activemq.command.TransactionId;
070import org.apache.activemq.transport.InactivityIOException;
071import org.apache.activemq.transport.amqp.AmqpHeader;
072import org.apache.activemq.transport.amqp.AmqpInactivityMonitor;
073import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
074import org.apache.activemq.transport.amqp.AmqpProtocolException;
075import org.apache.activemq.transport.amqp.AmqpTransport;
076import org.apache.activemq.transport.amqp.AmqpTransportFilter;
077import org.apache.activemq.transport.amqp.AmqpWireFormat;
078import org.apache.activemq.transport.amqp.ResponseHandler;
079import org.apache.activemq.transport.amqp.sasl.AmqpAuthenticator;
080import org.apache.activemq.util.IOExceptionSupport;
081import org.apache.activemq.util.IdGenerator;
082import org.apache.qpid.proton.Proton;
083import org.apache.qpid.proton.amqp.Symbol;
084import org.apache.qpid.proton.amqp.transaction.Coordinator;
085import org.apache.qpid.proton.amqp.transport.AmqpError;
086import org.apache.qpid.proton.amqp.transport.ErrorCondition;
087import org.apache.qpid.proton.engine.Collector;
088import org.apache.qpid.proton.engine.Connection;
089import org.apache.qpid.proton.engine.Delivery;
090import org.apache.qpid.proton.engine.EndpointState;
091import org.apache.qpid.proton.engine.Event;
092import org.apache.qpid.proton.engine.Link;
093import org.apache.qpid.proton.engine.Receiver;
094import org.apache.qpid.proton.engine.Sender;
095import org.apache.qpid.proton.engine.Session;
096import org.apache.qpid.proton.engine.Transport;
097import org.apache.qpid.proton.engine.impl.CollectorImpl;
098import org.apache.qpid.proton.engine.impl.ProtocolTracer;
099import org.apache.qpid.proton.engine.impl.TransportImpl;
100import org.apache.qpid.proton.framing.TransportFrame;
101import org.fusesource.hawtbuf.Buffer;
102import org.slf4j.Logger;
103import org.slf4j.LoggerFactory;
104
105/**
106 * Implements the mechanics of managing a single remote peer connection.
107 */
108public class AmqpConnection implements AmqpProtocolConverter {
109
110    private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
111    private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
112    private static final int CHANNEL_MAX = 32767;
113    private static final String BROKER_VERSION;
114    private static final String BROKER_PLATFORM;
115
116    static {
117        String javaVersion = System.getProperty("java.version");
118
119        BROKER_PLATFORM = "Java/" + (javaVersion == null ? "unknown" : javaVersion);
120
121        InputStream in = null;
122        String version = "5.12.0";
123        if ((in = AmqpConnection.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
124            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
125            try {
126                version = reader.readLine();
127            } catch(Exception e) {
128            }
129        }
130        BROKER_VERSION = version;
131    }
132
133    private final Transport protonTransport = Proton.transport();
134    private final Connection protonConnection = Proton.connection();
135    private final Collector eventCollector = new CollectorImpl();
136
137    private final AmqpTransport amqpTransport;
138    private final AmqpWireFormat amqpWireFormat;
139    private final BrokerService brokerService;
140
141    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
142    private final AtomicInteger lastCommandId = new AtomicInteger();
143    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
144    private final ConnectionInfo connectionInfo = new ConnectionInfo();
145    private long nextSessionId;
146    private long nextTempDestinationId;
147    private long nextTransactionId;
148    private boolean closing;
149    private boolean closedSocket;
150    private AmqpAuthenticator authenticator;
151
152    private final Map<TransactionId, AmqpTransactionCoordinator> transactions = new HashMap<TransactionId, AmqpTransactionCoordinator>();
153    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
154    private final ConcurrentMap<ConsumerId, AmqpSender> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, AmqpSender>();
155
156    public AmqpConnection(AmqpTransport transport, BrokerService brokerService) {
157        this.amqpTransport = transport;
158
159        AmqpInactivityMonitor monitor = transport.getInactivityMonitor();
160        if (monitor != null) {
161            monitor.setAmqpTransport(amqpTransport);
162        }
163
164        this.amqpWireFormat = transport.getWireFormat();
165        this.brokerService = brokerService;
166
167        // the configured maxFrameSize on the URI.
168        int maxFrameSize = amqpWireFormat.getMaxAmqpFrameSize();
169        if (maxFrameSize > AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE) {
170            this.protonTransport.setMaxFrameSize(maxFrameSize);
171        }
172
173        this.protonTransport.bind(this.protonConnection);
174        this.protonTransport.setChannelMax(CHANNEL_MAX);
175        this.protonTransport.setEmitFlowEventOnSend(false);
176
177        this.protonConnection.collect(eventCollector);
178
179        updateTracer();
180    }
181
182    /**
183     * Load and return a <code>[]Symbol</code> that contains the connection capabilities
184     * offered to new connections
185     *
186     * @return the capabilities that are offered to new clients on connect.
187     */
188    protected Symbol[] getConnectionCapabilitiesOffered() {
189        return new Symbol[]{ ANONYMOUS_RELAY };
190    }
191
192    /**
193     * Load and return a <code>Map<Symbol, Object></code> that contains the properties
194     * that this connection supplies to incoming connections.
195     *
196     * @return the properties that are offered to the incoming connection.
197     */
198    protected Map<Symbol, Object> getConnetionProperties() {
199        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
200
201        properties.put(QUEUE_PREFIX, "queue://");
202        properties.put(TOPIC_PREFIX, "topic://");
203        properties.put(PRODUCT, "ActiveMQ");
204        properties.put(VERSION, BROKER_VERSION);
205        properties.put(PLATFORM, BROKER_PLATFORM);
206
207        return properties;
208    }
209
210    /**
211     * Load and return a <code>Map<Symbol, Object></code> that contains the properties
212     * that this connection supplies to incoming connections when the open has failed
213     * and the remote should expect a close to follow.
214     *
215     * @return the properties that are offered to the incoming connection.
216     */
217    protected Map<Symbol, Object> getFailedConnetionProperties() {
218        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
219
220        properties.put(CONNECTION_OPEN_FAILED, true);
221
222        return properties;
223    }
224
225    @Override
226    public void updateTracer() {
227        if (amqpTransport.isTrace()) {
228            ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
229                @Override
230                public void receivedFrame(TransportFrame transportFrame) {
231                    TRACE_FRAMES.trace("{} | RECV: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody());
232                }
233
234                @Override
235                public void sentFrame(TransportFrame transportFrame) {
236                    TRACE_FRAMES.trace("{} | SENT: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody());
237                }
238            });
239        }
240    }
241
242    @Override
243    public long keepAlive() throws IOException {
244        long rescheduleAt = 0l;
245
246        LOG.trace("Performing connection:{} keep-alive processing", amqpTransport.getRemoteAddress());
247
248        if (protonConnection.getLocalState() != EndpointState.CLOSED) {
249            // Using nano time since it is not related to the wall clock, which may change
250            long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
251            rescheduleAt = protonTransport.tick(now) - now;
252            pumpProtonToSocket();
253            if (protonTransport.isClosed()) {
254                rescheduleAt = 0;
255                LOG.debug("Transport closed after inactivity check.");
256                throw new InactivityIOException("Channel was inactive for to long");
257            }
258        }
259
260        LOG.trace("Connection:{} keep alive processing done, next update in {} milliseconds.",
261                  amqpTransport.getRemoteAddress(), rescheduleAt);
262
263        return rescheduleAt;
264    }
265
266    //----- Connection Properties Accessors ----------------------------------//
267
268    /**
269     * @return the amount of credit assigned to AMQP receiver links created from
270     *         sender links on the remote peer.
271     */
272    public int getConfiguredReceiverCredit() {
273        return amqpWireFormat.getProducerCredit();
274    }
275
276    /**
277     * @return the transformer type that was configured for this AMQP transport.
278     */
279    public String getConfiguredTransformer() {
280        return amqpWireFormat.getTransformer();
281    }
282
283    /**
284     * @return the ActiveMQ ConnectionId that identifies this AMQP Connection.
285     */
286    public ConnectionId getConnectionId() {
287        return connectionId;
288    }
289
290    /**
291     * @return the Client ID used to create the connection with ActiveMQ
292     */
293    public String getClientId() {
294        return connectionInfo.getClientId();
295    }
296
297    /**
298     * @return the configured max frame size allowed for incoming messages.
299     */
300    public long getMaxFrameSize() {
301        return amqpWireFormat.getMaxFrameSize();
302    }
303
304    //----- Proton Event handling and IO support -----------------------------//
305
306    void pumpProtonToSocket() {
307        try {
308            boolean done = false;
309            while (!done) {
310                ByteBuffer toWrite = protonTransport.getOutputBuffer();
311                if (toWrite != null && toWrite.hasRemaining()) {
312                    LOG.trace("Sending {} bytes out", toWrite.limit());
313                    amqpTransport.sendToAmqp(toWrite);
314                    protonTransport.outputConsumed();
315                } else {
316                    done = true;
317                }
318            }
319        } catch (IOException e) {
320            amqpTransport.onException(e);
321        }
322    }
323
324    @Override
325    public void onAMQPData(Object command) throws Exception {
326        Buffer frame;
327        if (command.getClass() == AmqpHeader.class) {
328            AmqpHeader header = (AmqpHeader) command;
329
330            if (amqpWireFormat.isHeaderValid(header)) {
331                LOG.trace("Connection from an AMQP v1.0 client initiated. {}", header);
332            } else {
333                LOG.warn("Connection attempt from non AMQP v1.0 client. {}", header);
334                AmqpHeader reply = amqpWireFormat.getMinimallySupportedHeader();
335                amqpTransport.sendToAmqp(reply.getBuffer());
336                handleException(new AmqpProtocolException(
337                    "Connection from client using unsupported AMQP attempted", true));
338            }
339
340            switch (header.getProtocolId()) {
341                case 0:
342                    authenticator = null;
343                    break; // nothing to do..
344                case 3: // Client will be using SASL for auth..
345                    authenticator = new AmqpAuthenticator(amqpTransport, protonTransport.sasl(), brokerService);
346                    break;
347                default:
348            }
349            frame = header.getBuffer();
350        } else {
351            frame = (Buffer) command;
352        }
353
354        if (protonTransport.isClosed()) {
355            LOG.debug("Ignoring incoming AMQP data, transport is closed.");
356            return;
357        }
358
359        while (frame.length > 0) {
360            try {
361                int count = protonTransport.input(frame.data, frame.offset, frame.length);
362                frame.moveHead(count);
363            } catch (Throwable e) {
364                handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
365                return;
366            }
367
368            if (authenticator != null) {
369                processSaslExchange();
370            } else {
371                processProtonEvents();
372            }
373        }
374    }
375
376    private void processSaslExchange() throws Exception {
377        authenticator.processSaslExchange(connectionInfo);
378        if (authenticator.isDone()) {
379            amqpTransport.getWireFormat().resetMagicRead();
380        }
381        pumpProtonToSocket();
382    }
383
384    private void processProtonEvents() throws Exception {
385        try {
386            Event event = null;
387            while ((event = eventCollector.peek()) != null) {
388                if (amqpTransport.isTrace()) {
389                    LOG.trace("Processing event: {}", event.getType());
390                }
391                switch (event.getType()) {
392                    case CONNECTION_REMOTE_OPEN:
393                        processConnectionOpen(event.getConnection());
394                        break;
395                    case CONNECTION_REMOTE_CLOSE:
396                        processConnectionClose(event.getConnection());
397                        break;
398                    case SESSION_REMOTE_OPEN:
399                        processSessionOpen(event.getSession());
400                        break;
401                    case SESSION_REMOTE_CLOSE:
402                        processSessionClose(event.getSession());
403                        break;
404                    case LINK_REMOTE_OPEN:
405                        processLinkOpen(event.getLink());
406                        break;
407                    case LINK_REMOTE_DETACH:
408                        processLinkDetach(event.getLink());
409                        break;
410                    case LINK_REMOTE_CLOSE:
411                        processLinkClose(event.getLink());
412                        break;
413                    case LINK_FLOW:
414                        processLinkFlow(event.getLink());
415                        break;
416                    case DELIVERY:
417                        processDelivery(event.getDelivery());
418                        break;
419                    default:
420                        break;
421                }
422
423                eventCollector.pop();
424            }
425
426        } catch (Throwable e) {
427            handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
428        }
429
430        pumpProtonToSocket();
431    }
432
433    protected void processConnectionOpen(Connection connection) throws Exception {
434
435        stopConnectionTimeoutChecker();
436
437        connectionInfo.setResponseRequired(true);
438        connectionInfo.setConnectionId(connectionId);
439
440        String clientId = protonConnection.getRemoteContainer();
441        if (clientId != null && !clientId.isEmpty()) {
442            connectionInfo.setClientId(clientId);
443        }
444
445        connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
446
447        if (connection.getTransport().getRemoteIdleTimeout() > 0 && !amqpTransport.isUseInactivityMonitor()) {
448            // We cannot meet the requested Idle processing because the inactivity monitor is
449            // disabled so we won't send idle frames to match the request.
450            protonConnection.setProperties(getFailedConnetionProperties());
451            protonConnection.open();
452            protonConnection.setCondition(new ErrorCondition(AmqpError.PRECONDITION_FAILED, "Cannot send idle frames"));
453            protonConnection.close();
454            pumpProtonToSocket();
455
456            amqpTransport.onException(new IOException(
457                "Connection failed, remote requested idle processing but inactivity monitoring is disbaled."));
458            return;
459        }
460
461        sendToActiveMQ(connectionInfo, new ResponseHandler() {
462            @Override
463            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
464                Throwable exception = null;
465                try {
466                    if (response.isException()) {
467                        protonConnection.setProperties(getFailedConnetionProperties());
468                        protonConnection.open();
469
470                        exception = ((ExceptionResponse) response).getException();
471                        if (exception instanceof SecurityException) {
472                            protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
473                        } else if (exception instanceof InvalidClientIDException) {
474                            ErrorCondition condition = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage());
475
476                            Map<Symbol, Object> infoMap = new HashMap<Symbol, Object> ();
477                            infoMap.put(INVALID_FIELD, CONTAINER_ID);
478                            condition.setInfo(infoMap);
479
480                            protonConnection.setCondition(condition);
481                        } else {
482                            protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, exception.getMessage()));
483                        }
484
485                        protonConnection.close();
486                    } else {
487
488                        if (amqpTransport.isUseInactivityMonitor() && amqpWireFormat.getIdleTimeout() > 0) {
489                            LOG.trace("Connection requesting Idle timeout of: {} mills", amqpWireFormat.getIdleTimeout());
490                            protonTransport.setIdleTimeout(amqpWireFormat.getIdleTimeout());
491                        }
492
493                        protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
494                        protonConnection.setProperties(getConnetionProperties());
495                        protonConnection.setContainer(brokerService.getBrokerName());
496                        protonConnection.open();
497
498                        configureInactivityMonitor();
499                    }
500                } finally {
501                    pumpProtonToSocket();
502
503                    if (response.isException()) {
504                        amqpTransport.onException(IOExceptionSupport.create(exception));
505                    }
506                }
507            }
508        });
509    }
510
511    protected void processConnectionClose(Connection connection) throws Exception {
512        if (!closing) {
513            closing = true;
514            sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() {
515                @Override
516                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
517                    protonConnection.close();
518                    protonConnection.free();
519
520                    if (!closedSocket) {
521                        pumpProtonToSocket();
522                    }
523                }
524            });
525
526            sendToActiveMQ(new ShutdownInfo());
527        }
528    }
529
530    protected void processSessionOpen(Session protonSession) throws Exception {
531        new AmqpSession(this, getNextSessionId(), protonSession).open();
532    }
533
534    protected void processSessionClose(Session protonSession) throws Exception {
535        if (protonSession.getContext() != null) {
536            ((AmqpResource) protonSession.getContext()).close();
537        } else {
538            protonSession.close();
539            protonSession.free();
540        }
541    }
542
543    protected void processLinkOpen(Link link) throws Exception {
544        link.setSource(link.getRemoteSource());
545        link.setTarget(link.getRemoteTarget());
546
547        AmqpSession session = (AmqpSession) link.getSession().getContext();
548        if (link instanceof Receiver) {
549            if (link.getRemoteTarget() instanceof Coordinator) {
550                session.createCoordinator((Receiver) link);
551            } else {
552                session.createReceiver((Receiver) link);
553            }
554        } else {
555            session.createSender((Sender) link);
556        }
557    }
558
559    protected void processLinkDetach(Link link) throws Exception {
560        Object context = link.getContext();
561
562        if (context instanceof AmqpLink) {
563            ((AmqpLink) context).detach();
564        } else {
565            link.detach();
566            link.free();
567        }
568    }
569
570    protected void processLinkClose(Link link) throws Exception {
571        Object context = link.getContext();
572
573        if (context instanceof AmqpLink) {
574            ((AmqpLink) context).close();;
575        } else {
576            link.close();
577            link.free();
578        }
579    }
580
581    protected void processLinkFlow(Link link) throws Exception {
582        Object context = link.getContext();
583        if (context instanceof AmqpLink) {
584            ((AmqpLink) context).flow();
585        }
586    }
587
588    protected void processDelivery(Delivery delivery) throws Exception {
589        if (!delivery.isPartial()) {
590            Object context = delivery.getLink().getContext();
591            if (context instanceof AmqpLink) {
592                AmqpLink amqpLink = (AmqpLink) context;
593                amqpLink.delivery(delivery);
594            }
595        }
596    }
597
598    //----- Event entry points for ActiveMQ commands and errors --------------//
599
600    @Override
601    public void onAMQPException(IOException error) {
602        closedSocket = true;
603        if (!closing) {
604            try {
605                closing = true;
606                // Attempt to inform the other end that we are going to close
607                // so that the client doesn't wait around forever.
608                protonConnection.setCondition(new ErrorCondition(AmqpError.DECODE_ERROR, error.getMessage()));
609                protonConnection.close();
610                pumpProtonToSocket();
611            } catch (Exception ignore) {
612            }
613            amqpTransport.sendToActiveMQ(error);
614        } else {
615            try {
616                amqpTransport.stop();
617            } catch (Exception ignore) {
618            }
619        }
620    }
621
622    @Override
623    public void onActiveMQCommand(Command command) throws Exception {
624        if (command.isResponse()) {
625            Response response = (Response) command;
626            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
627            if (rh != null) {
628                rh.onResponse(this, response);
629            } else {
630                // Pass down any unexpected errors. Should this close the connection?
631                if (response.isException()) {
632                    Throwable exception = ((ExceptionResponse) response).getException();
633                    handleException(exception);
634                }
635            }
636        } else if (command.isMessageDispatch()) {
637            MessageDispatch dispatch = (MessageDispatch) command;
638            AmqpSender sender = subscriptionsByConsumerId.get(dispatch.getConsumerId());
639            if (sender != null) {
640                // End of Queue Browse will have no Message object.
641                if (dispatch.getMessage() != null) {
642                    LOG.trace("Dispatching MessageId: {} to consumer", dispatch.getMessage().getMessageId());
643                } else {
644                    LOG.trace("Dispatching End of Browse Command to consumer {}", dispatch.getConsumerId());
645                }
646                sender.onMessageDispatch(dispatch);
647                if (dispatch.getMessage() != null) {
648                    LOG.trace("Finished Dispatch of MessageId: {} to consumer", dispatch.getMessage().getMessageId());
649                }
650            }
651        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
652            // Pass down any unexpected async errors. Should this close the connection?
653            Throwable exception = ((ConnectionError) command).getException();
654            handleException(exception);
655        } else if (command.isConsumerControl()) {
656            ConsumerControl control = (ConsumerControl) command;
657            AmqpSender sender = subscriptionsByConsumerId.get(control.getConsumerId());
658            if (sender != null) {
659                sender.onConsumerControl(control);
660            }
661        } else if (command.isBrokerInfo()) {
662            // ignore
663        } else {
664            LOG.debug("Do not know how to process ActiveMQ Command {}", command);
665        }
666    }
667
668    //----- Utility methods for connection resources to use ------------------//
669
670    void registerSender(ConsumerId consumerId, AmqpSender sender) {
671        subscriptionsByConsumerId.put(consumerId, sender);
672    }
673
674    void unregisterSender(ConsumerId consumerId) {
675        subscriptionsByConsumerId.remove(consumerId);
676    }
677
678    void registerTransaction(TransactionId txId, AmqpTransactionCoordinator coordinator) {
679        transactions.put(txId, coordinator);
680    }
681
682    void unregisterTransaction(TransactionId txId) {
683        transactions.remove(txId);
684    }
685
686    AmqpTransactionCoordinator getTxCoordinator(TransactionId txId) {
687        return transactions.get(txId);
688    }
689
690    LocalTransactionId getNextTransactionId() {
691        return new LocalTransactionId(getConnectionId(), ++nextTransactionId);
692    }
693
694    ConsumerInfo lookupSubscription(String subscriptionName) throws AmqpProtocolException {
695        ConsumerInfo result = null;
696        RegionBroker regionBroker;
697
698        try {
699            regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
700        } catch (Exception e) {
701            throw new AmqpProtocolException("Error finding subscription: " + subscriptionName + ": " + e.getMessage(), false, e);
702        }
703
704        final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
705        DurableTopicSubscription subscription = topicRegion.lookupSubscription(subscriptionName, connectionInfo.getClientId());
706        if (subscription != null) {
707            result = subscription.getConsumerInfo();
708        }
709
710        return result;
711    }
712
713    ActiveMQDestination createTemporaryDestination(final Link link, Symbol[] capabilities) {
714        ActiveMQDestination rc = null;
715        if (contains(capabilities, TEMP_TOPIC_CAPABILITY)) {
716            rc = new ActiveMQTempTopic(connectionId, nextTempDestinationId++);
717        } else if (contains(capabilities, TEMP_QUEUE_CAPABILITY)) {
718            rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
719        } else {
720            LOG.debug("Dynamic link request with no type capability, defaults to Temporary Queue");
721            rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
722        }
723
724        DestinationInfo info = new DestinationInfo();
725        info.setConnectionId(connectionId);
726        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
727        info.setDestination(rc);
728
729        sendToActiveMQ(info, new ResponseHandler() {
730
731            @Override
732            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
733                if (response.isException()) {
734                    link.setSource(null);
735
736                    Throwable exception = ((ExceptionResponse) response).getException();
737                    if (exception instanceof SecurityException) {
738                        link.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
739                    } else {
740                        link.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
741                    }
742
743                    link.close();
744                    link.free();
745                }
746            }
747        });
748
749        return rc;
750    }
751
752    void deleteTemporaryDestination(ActiveMQTempDestination destination) {
753        DestinationInfo info = new DestinationInfo();
754        info.setConnectionId(connectionId);
755        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
756        info.setDestination(destination);
757
758        sendToActiveMQ(info, new ResponseHandler() {
759
760            @Override
761            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
762                if (response.isException()) {
763                    Throwable exception = ((ExceptionResponse) response).getException();
764                    LOG.debug("Error during temp destination removeal: {}", exception.getMessage());
765                }
766            }
767        });
768    }
769
770    void sendToActiveMQ(Command command) {
771        sendToActiveMQ(command, null);
772    }
773
774    void sendToActiveMQ(Command command, ResponseHandler handler) {
775        command.setCommandId(lastCommandId.incrementAndGet());
776        if (handler != null) {
777            command.setResponseRequired(true);
778            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
779        }
780        amqpTransport.sendToActiveMQ(command);
781    }
782
783    void handleException(Throwable exception) {
784        LOG.debug("Exception detail", exception);
785        if (exception instanceof AmqpProtocolException) {
786            onAMQPException((IOException) exception);
787        } else {
788            try {
789                // Must ensure that the broker removes Connection resources.
790                sendToActiveMQ(new ShutdownInfo());
791                amqpTransport.stop();
792            } catch (Throwable e) {
793                LOG.error("Failed to stop AMQP Transport ", e);
794            }
795        }
796    }
797
798    //----- Internal implementation ------------------------------------------//
799
800    private SessionId getNextSessionId() {
801        return new SessionId(connectionId, nextSessionId++);
802    }
803
804    private void stopConnectionTimeoutChecker() {
805        AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
806        if (monitor != null) {
807            monitor.stopConnectionTimeoutChecker();
808        }
809    }
810
811    private void configureInactivityMonitor() {
812        AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
813        if (monitor == null) {
814            return;
815        }
816
817        // If either end has idle timeout requirements then the tick method
818        // will give us a deadline on the next time we need to tick() in order
819        // to meet those obligations.
820        // Using nano time since it is not related to the wall clock, which may change
821        long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
822        long nextIdleCheck = protonTransport.tick(now);
823        if (nextIdleCheck > 0) {
824            long delay = nextIdleCheck - now;
825            LOG.trace("Connection keep-alive processing starts in: {}", delay);
826            monitor.startKeepAliveTask(delay);
827        } else {
828            LOG.trace("Connection does not require keep-alive processing");
829        }
830    }
831}