001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import static org.apache.activemq.transport.amqp.AmqpSupport.ANONYMOUS_RELAY;
020import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
021import static org.apache.activemq.transport.amqp.AmqpSupport.CONTAINER_ID;
022import static org.apache.activemq.transport.amqp.AmqpSupport.INVALID_FIELD;
023import static org.apache.activemq.transport.amqp.AmqpSupport.PLATFORM;
024import static org.apache.activemq.transport.amqp.AmqpSupport.PRODUCT;
025import static org.apache.activemq.transport.amqp.AmqpSupport.QUEUE_PREFIX;
026import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
027import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
028import static org.apache.activemq.transport.amqp.AmqpSupport.TOPIC_PREFIX;
029import static org.apache.activemq.transport.amqp.AmqpSupport.VERSION;
030import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
031
032import java.io.BufferedReader;
033import java.io.IOException;
034import java.io.InputStream;
035import java.io.InputStreamReader;
036import java.nio.ByteBuffer;
037import java.util.HashMap;
038import java.util.Map;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentMap;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicInteger;
043
044import javax.jms.InvalidClientIDException;
045
046import org.apache.activemq.broker.BrokerService;
047import org.apache.activemq.broker.region.AbstractRegion;
048import org.apache.activemq.broker.region.DurableTopicSubscription;
049import org.apache.activemq.broker.region.RegionBroker;
050import org.apache.activemq.broker.region.Subscription;
051import org.apache.activemq.broker.region.TopicRegion;
052import org.apache.activemq.command.ActiveMQDestination;
053import org.apache.activemq.command.ActiveMQTempDestination;
054import org.apache.activemq.command.ActiveMQTempQueue;
055import org.apache.activemq.command.ActiveMQTempTopic;
056import org.apache.activemq.command.Command;
057import org.apache.activemq.command.ConnectionError;
058import org.apache.activemq.command.ConnectionId;
059import org.apache.activemq.command.ConnectionInfo;
060import org.apache.activemq.command.ConsumerControl;
061import org.apache.activemq.command.ConsumerId;
062import org.apache.activemq.command.ConsumerInfo;
063import org.apache.activemq.command.DestinationInfo;
064import org.apache.activemq.command.ExceptionResponse;
065import org.apache.activemq.command.LocalTransactionId;
066import org.apache.activemq.command.MessageDispatch;
067import org.apache.activemq.command.RemoveInfo;
068import org.apache.activemq.command.Response;
069import org.apache.activemq.command.SessionId;
070import org.apache.activemq.command.ShutdownInfo;
071import org.apache.activemq.command.TransactionId;
072import org.apache.activemq.transport.InactivityIOException;
073import org.apache.activemq.transport.amqp.AmqpHeader;
074import org.apache.activemq.transport.amqp.AmqpInactivityMonitor;
075import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
076import org.apache.activemq.transport.amqp.AmqpProtocolException;
077import org.apache.activemq.transport.amqp.AmqpTransport;
078import org.apache.activemq.transport.amqp.AmqpTransportFilter;
079import org.apache.activemq.transport.amqp.AmqpWireFormat;
080import org.apache.activemq.transport.amqp.ResponseHandler;
081import org.apache.activemq.transport.amqp.sasl.AmqpAuthenticator;
082import org.apache.activemq.util.IOExceptionSupport;
083import org.apache.activemq.util.IdGenerator;
084import org.apache.qpid.proton.Proton;
085import org.apache.qpid.proton.amqp.Symbol;
086import org.apache.qpid.proton.amqp.transaction.Coordinator;
087import org.apache.qpid.proton.amqp.transport.AmqpError;
088import org.apache.qpid.proton.amqp.transport.ErrorCondition;
089import org.apache.qpid.proton.engine.Collector;
090import org.apache.qpid.proton.engine.Connection;
091import org.apache.qpid.proton.engine.Delivery;
092import org.apache.qpid.proton.engine.EndpointState;
093import org.apache.qpid.proton.engine.Event;
094import org.apache.qpid.proton.engine.Link;
095import org.apache.qpid.proton.engine.Receiver;
096import org.apache.qpid.proton.engine.Sender;
097import org.apache.qpid.proton.engine.Session;
098import org.apache.qpid.proton.engine.Transport;
099import org.apache.qpid.proton.engine.impl.CollectorImpl;
100import org.apache.qpid.proton.engine.impl.ProtocolTracer;
101import org.apache.qpid.proton.engine.impl.TransportImpl;
102import org.apache.qpid.proton.framing.TransportFrame;
103import org.fusesource.hawtbuf.Buffer;
104import org.slf4j.Logger;
105import org.slf4j.LoggerFactory;
106
107/**
108 * Implements the mechanics of managing a single remote peer connection.
109 */
110public class AmqpConnection implements AmqpProtocolConverter {
111
112    private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
113    private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
114    private static final int CHANNEL_MAX = 32767;
115    private static final String BROKER_VERSION;
116    private static final String BROKER_PLATFORM;
117
118    static {
119        String javaVersion = System.getProperty("java.version");
120
121        BROKER_PLATFORM = "Java/" + (javaVersion == null ? "unknown" : javaVersion);
122
123        InputStream in = null;
124        String version = "5.12.0";
125        if ((in = AmqpConnection.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
126            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
127            try {
128                version = reader.readLine();
129            } catch(Exception e) {
130            }
131        }
132        BROKER_VERSION = version;
133    }
134
135    private final Transport protonTransport = Proton.transport();
136    private final Connection protonConnection = Proton.connection();
137    private final Collector eventCollector = new CollectorImpl();
138
139    private final AmqpTransport amqpTransport;
140    private final AmqpWireFormat amqpWireFormat;
141    private final BrokerService brokerService;
142
143    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
144    private final AtomicInteger lastCommandId = new AtomicInteger();
145    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
146    private final ConnectionInfo connectionInfo = new ConnectionInfo();
147    private long nextSessionId;
148    private long nextTempDestinationId;
149    private long nextTransactionId;
150    private boolean closing;
151    private boolean closedSocket;
152    private AmqpAuthenticator authenticator;
153
154    private final Map<TransactionId, AmqpTransactionCoordinator> transactions = new HashMap<TransactionId, AmqpTransactionCoordinator>();
155    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
156    private final ConcurrentMap<ConsumerId, AmqpSender> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, AmqpSender>();
157
158    public AmqpConnection(AmqpTransport transport, BrokerService brokerService) {
159        this.amqpTransport = transport;
160
161        AmqpInactivityMonitor monitor = transport.getInactivityMonitor();
162        if (monitor != null) {
163            monitor.setAmqpTransport(amqpTransport);
164        }
165
166        this.amqpWireFormat = transport.getWireFormat();
167        this.brokerService = brokerService;
168
169        // the configured maxFrameSize on the URI.
170        int maxFrameSize = amqpWireFormat.getMaxAmqpFrameSize();
171        if (maxFrameSize > AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE) {
172            this.protonTransport.setMaxFrameSize(maxFrameSize);
173        }
174
175        this.protonTransport.bind(this.protonConnection);
176        this.protonTransport.setChannelMax(CHANNEL_MAX);
177        this.protonTransport.setEmitFlowEventOnSend(false);
178
179        this.protonConnection.collect(eventCollector);
180
181        updateTracer();
182    }
183
184    /**
185     * Load and return a <code>[]Symbol</code> that contains the connection capabilities
186     * offered to new connections
187     *
188     * @return the capabilities that are offered to new clients on connect.
189     */
190    protected Symbol[] getConnectionCapabilitiesOffered() {
191        return new Symbol[]{ ANONYMOUS_RELAY };
192    }
193
194    /**
195     * Load and return a <code>Map<Symbol, Object></code> that contains the properties
196     * that this connection supplies to incoming connections.
197     *
198     * @return the properties that are offered to the incoming connection.
199     */
200    protected Map<Symbol, Object> getConnetionProperties() {
201        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
202
203        properties.put(QUEUE_PREFIX, "queue://");
204        properties.put(TOPIC_PREFIX, "topic://");
205        properties.put(PRODUCT, "ActiveMQ");
206        properties.put(VERSION, BROKER_VERSION);
207        properties.put(PLATFORM, BROKER_PLATFORM);
208
209        return properties;
210    }
211
212    /**
213     * Load and return a <code>Map<Symbol, Object></code> that contains the properties
214     * that this connection supplies to incoming connections when the open has failed
215     * and the remote should expect a close to follow.
216     *
217     * @return the properties that are offered to the incoming connection.
218     */
219    protected Map<Symbol, Object> getFailedConnetionProperties() {
220        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
221
222        properties.put(CONNECTION_OPEN_FAILED, true);
223
224        return properties;
225    }
226
227    @Override
228    public void updateTracer() {
229        if (amqpTransport.isTrace()) {
230            ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
231                @Override
232                public void receivedFrame(TransportFrame transportFrame) {
233                    TRACE_FRAMES.trace("{} | RECV: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody());
234                }
235
236                @Override
237                public void sentFrame(TransportFrame transportFrame) {
238                    TRACE_FRAMES.trace("{} | SENT: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody());
239                }
240            });
241        }
242    }
243
244    @Override
245    public long keepAlive() throws IOException {
246        long rescheduleAt = 0l;
247
248        LOG.trace("Performing connection:{} keep-alive processing", amqpTransport.getRemoteAddress());
249
250        if (protonConnection.getLocalState() != EndpointState.CLOSED) {
251            // Using nano time since it is not related to the wall clock, which may change
252            long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
253            rescheduleAt = protonTransport.tick(now) - now;
254            pumpProtonToSocket();
255            if (protonTransport.isClosed()) {
256                rescheduleAt = 0;
257                LOG.debug("Transport closed after inactivity check.");
258                throw new InactivityIOException("Channel was inactive for to long");
259            }
260        }
261
262        LOG.trace("Connection:{} keep alive processing done, next update in {} milliseconds.",
263                  amqpTransport.getRemoteAddress(), rescheduleAt);
264
265        return rescheduleAt;
266    }
267
268    //----- Connection Properties Accessors ----------------------------------//
269
270    /**
271     * @return the amount of credit assigned to AMQP receiver links created from
272     *         sender links on the remote peer.
273     */
274    public int getConfiguredReceiverCredit() {
275        return amqpWireFormat.getProducerCredit();
276    }
277
278    /**
279     * @return the transformer type that was configured for this AMQP transport.
280     */
281    public String getConfiguredTransformer() {
282        return amqpWireFormat.getTransformer();
283    }
284
285    /**
286     * @return the ActiveMQ ConnectionId that identifies this AMQP Connection.
287     */
288    public ConnectionId getConnectionId() {
289        return connectionId;
290    }
291
292    /**
293     * @return the Client ID used to create the connection with ActiveMQ
294     */
295    public String getClientId() {
296        return connectionInfo.getClientId();
297    }
298
299    /**
300     * @return the configured max frame size allowed for incoming messages.
301     */
302    public long getMaxFrameSize() {
303        return amqpWireFormat.getMaxFrameSize();
304    }
305
306    //----- Proton Event handling and IO support -----------------------------//
307
308    void pumpProtonToSocket() {
309        try {
310            boolean done = false;
311            while (!done) {
312                ByteBuffer toWrite = protonTransport.getOutputBuffer();
313                if (toWrite != null && toWrite.hasRemaining()) {
314                    LOG.trace("Sending {} bytes out", toWrite.limit());
315                    amqpTransport.sendToAmqp(toWrite);
316                    protonTransport.outputConsumed();
317                } else {
318                    done = true;
319                }
320            }
321        } catch (IOException e) {
322            amqpTransport.onException(e);
323        }
324    }
325
326    @Override
327    public void onAMQPData(Object command) throws Exception {
328        Buffer frame;
329        if (command.getClass() == AmqpHeader.class) {
330            AmqpHeader header = (AmqpHeader) command;
331
332            if (amqpWireFormat.isHeaderValid(header)) {
333                LOG.trace("Connection from an AMQP v1.0 client initiated. {}", header);
334            } else {
335                LOG.warn("Connection attempt from non AMQP v1.0 client. {}", header);
336                AmqpHeader reply = amqpWireFormat.getMinimallySupportedHeader();
337                amqpTransport.sendToAmqp(reply.getBuffer());
338                handleException(new AmqpProtocolException(
339                    "Connection from client using unsupported AMQP attempted", true));
340            }
341
342            switch (header.getProtocolId()) {
343                case 0:
344                    authenticator = null;
345                    break; // nothing to do..
346                case 3: // Client will be using SASL for auth..
347                    authenticator = new AmqpAuthenticator(amqpTransport, protonTransport.sasl(), brokerService);
348                    break;
349                default:
350            }
351            frame = header.getBuffer();
352        } else {
353            frame = (Buffer) command;
354        }
355
356        if (protonTransport.isClosed()) {
357            LOG.debug("Ignoring incoming AMQP data, transport is closed.");
358            return;
359        }
360
361        while (frame.length > 0) {
362            try {
363                int count = protonTransport.input(frame.data, frame.offset, frame.length);
364                frame.moveHead(count);
365            } catch (Throwable e) {
366                handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
367                return;
368            }
369
370            if (authenticator != null) {
371                processSaslExchange();
372            } else {
373                processProtonEvents();
374            }
375        }
376    }
377
378    private void processSaslExchange() throws Exception {
379        authenticator.processSaslExchange(connectionInfo);
380        if (authenticator.isDone()) {
381            amqpTransport.getWireFormat().resetMagicRead();
382        }
383        pumpProtonToSocket();
384    }
385
386    private void processProtonEvents() throws Exception {
387        try {
388            Event event = null;
389            while ((event = eventCollector.peek()) != null) {
390                if (amqpTransport.isTrace()) {
391                    LOG.trace("Processing event: {}", event.getType());
392                }
393                switch (event.getType()) {
394                    case CONNECTION_REMOTE_OPEN:
395                        processConnectionOpen(event.getConnection());
396                        break;
397                    case CONNECTION_REMOTE_CLOSE:
398                        processConnectionClose(event.getConnection());
399                        break;
400                    case SESSION_REMOTE_OPEN:
401                        processSessionOpen(event.getSession());
402                        break;
403                    case SESSION_REMOTE_CLOSE:
404                        processSessionClose(event.getSession());
405                        break;
406                    case LINK_REMOTE_OPEN:
407                        processLinkOpen(event.getLink());
408                        break;
409                    case LINK_REMOTE_DETACH:
410                        processLinkDetach(event.getLink());
411                        break;
412                    case LINK_REMOTE_CLOSE:
413                        processLinkClose(event.getLink());
414                        break;
415                    case LINK_FLOW:
416                        processLinkFlow(event.getLink());
417                        break;
418                    case DELIVERY:
419                        processDelivery(event.getDelivery());
420                        break;
421                    default:
422                        break;
423                }
424
425                eventCollector.pop();
426            }
427
428        } catch (Throwable e) {
429            handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
430        }
431
432        pumpProtonToSocket();
433    }
434
435    protected void processConnectionOpen(Connection connection) throws Exception {
436
437        stopConnectionTimeoutChecker();
438
439        connectionInfo.setResponseRequired(true);
440        connectionInfo.setConnectionId(connectionId);
441
442        String clientId = protonConnection.getRemoteContainer();
443        if (clientId != null && !clientId.isEmpty()) {
444            connectionInfo.setClientId(clientId);
445        }
446
447        connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
448
449        if (connection.getTransport().getRemoteIdleTimeout() > 0 && !amqpTransport.isUseInactivityMonitor()) {
450            // We cannot meet the requested Idle processing because the inactivity monitor is
451            // disabled so we won't send idle frames to match the request.
452            protonConnection.setProperties(getFailedConnetionProperties());
453            protonConnection.open();
454            protonConnection.setCondition(new ErrorCondition(AmqpError.PRECONDITION_FAILED, "Cannot send idle frames"));
455            protonConnection.close();
456            pumpProtonToSocket();
457
458            amqpTransport.onException(new IOException(
459                "Connection failed, remote requested idle processing but inactivity monitoring is disbaled."));
460            return;
461        }
462
463        sendToActiveMQ(connectionInfo, new ResponseHandler() {
464            @Override
465            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
466                Throwable exception = null;
467                try {
468                    if (response.isException()) {
469                        protonConnection.setProperties(getFailedConnetionProperties());
470                        protonConnection.open();
471
472                        exception = ((ExceptionResponse) response).getException();
473                        if (exception instanceof SecurityException) {
474                            protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
475                        } else if (exception instanceof InvalidClientIDException) {
476                            ErrorCondition condition = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage());
477
478                            Map<Symbol, Object> infoMap = new HashMap<Symbol, Object> ();
479                            infoMap.put(INVALID_FIELD, CONTAINER_ID);
480                            condition.setInfo(infoMap);
481
482                            protonConnection.setCondition(condition);
483                        } else {
484                            protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, exception.getMessage()));
485                        }
486
487                        protonConnection.close();
488                    } else {
489
490                        if (amqpTransport.isUseInactivityMonitor() && amqpWireFormat.getIdleTimeout() > 0) {
491                            LOG.trace("Connection requesting Idle timeout of: {} mills", amqpWireFormat.getIdleTimeout());
492                            protonTransport.setIdleTimeout(amqpWireFormat.getIdleTimeout());
493                        }
494
495                        protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
496                        protonConnection.setProperties(getConnetionProperties());
497                        protonConnection.setContainer(brokerService.getBrokerName());
498                        protonConnection.open();
499
500                        configureInactivityMonitor();
501                    }
502                } finally {
503                    pumpProtonToSocket();
504
505                    if (response.isException()) {
506                        amqpTransport.onException(IOExceptionSupport.create(exception));
507                    }
508                }
509            }
510        });
511    }
512
513    protected void processConnectionClose(Connection connection) throws Exception {
514        if (!closing) {
515            closing = true;
516            sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() {
517                @Override
518                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
519                    protonConnection.close();
520                    protonConnection.free();
521
522                    if (!closedSocket) {
523                        pumpProtonToSocket();
524                    }
525                }
526            });
527
528            sendToActiveMQ(new ShutdownInfo());
529        }
530    }
531
532    protected void processSessionOpen(Session protonSession) throws Exception {
533        new AmqpSession(this, getNextSessionId(), protonSession).open();
534    }
535
536    protected void processSessionClose(Session protonSession) throws Exception {
537        if (protonSession.getContext() != null) {
538            ((AmqpResource) protonSession.getContext()).close();
539        } else {
540            protonSession.close();
541            protonSession.free();
542        }
543    }
544
545    protected void processLinkOpen(Link link) throws Exception {
546        link.setSource(link.getRemoteSource());
547        link.setTarget(link.getRemoteTarget());
548
549        AmqpSession session = (AmqpSession) link.getSession().getContext();
550        if (link instanceof Receiver) {
551            if (link.getRemoteTarget() instanceof Coordinator) {
552                session.createCoordinator((Receiver) link);
553            } else {
554                session.createReceiver((Receiver) link);
555            }
556        } else {
557            session.createSender((Sender) link);
558        }
559    }
560
561    protected void processLinkDetach(Link link) throws Exception {
562        Object context = link.getContext();
563
564        if (context instanceof AmqpLink) {
565            ((AmqpLink) context).detach();
566        } else {
567            link.detach();
568            link.free();
569        }
570    }
571
572    protected void processLinkClose(Link link) throws Exception {
573        Object context = link.getContext();
574
575        if (context instanceof AmqpLink) {
576            ((AmqpLink) context).close();;
577        } else {
578            link.close();
579            link.free();
580        }
581    }
582
583    protected void processLinkFlow(Link link) throws Exception {
584        Object context = link.getContext();
585        if (context instanceof AmqpLink) {
586            ((AmqpLink) context).flow();
587        }
588    }
589
590    protected void processDelivery(Delivery delivery) throws Exception {
591        if (!delivery.isPartial()) {
592            Object context = delivery.getLink().getContext();
593            if (context instanceof AmqpLink) {
594                AmqpLink amqpLink = (AmqpLink) context;
595                amqpLink.delivery(delivery);
596            }
597        }
598    }
599
600    //----- Event entry points for ActiveMQ commands and errors --------------//
601
602    @Override
603    public void onAMQPException(IOException error) {
604        closedSocket = true;
605        if (!closing) {
606            try {
607                closing = true;
608                // Attempt to inform the other end that we are going to close
609                // so that the client doesn't wait around forever.
610                protonConnection.setCondition(new ErrorCondition(AmqpError.DECODE_ERROR, error.getMessage()));
611                protonConnection.close();
612                pumpProtonToSocket();
613            } catch (Exception ignore) {
614            }
615            amqpTransport.sendToActiveMQ(error);
616        } else {
617            try {
618                amqpTransport.stop();
619            } catch (Exception ignore) {
620            }
621        }
622    }
623
624    @Override
625    public void onActiveMQCommand(Command command) throws Exception {
626        if (command.isResponse()) {
627            Response response = (Response) command;
628            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
629            if (rh != null) {
630                rh.onResponse(this, response);
631            } else {
632                // Pass down any unexpected errors. Should this close the connection?
633                if (response.isException()) {
634                    Throwable exception = ((ExceptionResponse) response).getException();
635                    handleException(exception);
636                }
637            }
638        } else if (command.isMessageDispatch()) {
639            MessageDispatch dispatch = (MessageDispatch) command;
640            AmqpSender sender = subscriptionsByConsumerId.get(dispatch.getConsumerId());
641            if (sender != null) {
642                // End of Queue Browse will have no Message object.
643                if (dispatch.getMessage() != null) {
644                    LOG.trace("Dispatching MessageId: {} to consumer", dispatch.getMessage().getMessageId());
645                } else {
646                    LOG.trace("Dispatching End of Browse Command to consumer {}", dispatch.getConsumerId());
647                }
648                sender.onMessageDispatch(dispatch);
649                if (dispatch.getMessage() != null) {
650                    LOG.trace("Finished Dispatch of MessageId: {} to consumer", dispatch.getMessage().getMessageId());
651                }
652            }
653        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
654            // Pass down any unexpected async errors. Should this close the connection?
655            Throwable exception = ((ConnectionError) command).getException();
656            handleException(exception);
657        } else if (command.isConsumerControl()) {
658            ConsumerControl control = (ConsumerControl) command;
659            AmqpSender sender = subscriptionsByConsumerId.get(control.getConsumerId());
660            if (sender != null) {
661                sender.onConsumerControl(control);
662            }
663        } else if (command.isBrokerInfo()) {
664            // ignore
665        } else {
666            LOG.debug("Do not know how to process ActiveMQ Command {}", command);
667        }
668    }
669
670    //----- Utility methods for connection resources to use ------------------//
671
672    void registerSender(ConsumerId consumerId, AmqpSender sender) {
673        subscriptionsByConsumerId.put(consumerId, sender);
674    }
675
676    void unregisterSender(ConsumerId consumerId) {
677        subscriptionsByConsumerId.remove(consumerId);
678    }
679
680    void registerTransaction(TransactionId txId, AmqpTransactionCoordinator coordinator) {
681        transactions.put(txId, coordinator);
682    }
683
684    void unregisterTransaction(TransactionId txId) {
685        transactions.remove(txId);
686    }
687
688    AmqpTransactionCoordinator getTxCoordinator(TransactionId txId) {
689        return transactions.get(txId);
690    }
691
692    LocalTransactionId getNextTransactionId() {
693        return new LocalTransactionId(getConnectionId(), ++nextTransactionId);
694    }
695
696    ConsumerInfo lookupSubscription(String subscriptionName) throws AmqpProtocolException {
697        ConsumerInfo result = null;
698        RegionBroker regionBroker;
699
700        try {
701            regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
702        } catch (Exception e) {
703            throw new AmqpProtocolException("Error finding subscription: " + subscriptionName + ": " + e.getMessage(), false, e);
704        }
705
706        final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
707        DurableTopicSubscription subscription = topicRegion.lookupSubscription(subscriptionName, connectionInfo.getClientId());
708        if (subscription != null) {
709            result = subscription.getConsumerInfo();
710        }
711
712        return result;
713    }
714
715
716    Subscription lookupPrefetchSubscription(ConsumerInfo consumerInfo)  {
717        Subscription subscription = null;
718        try {
719            subscription = ((AbstractRegion)((RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class)).getRegion(consumerInfo.getDestination())).getSubscriptions().get(consumerInfo.getConsumerId());
720        } catch (Exception e) {
721            LOG.warn("Error finding subscription for: " + consumerInfo + ": " + e.getMessage(), false, e);
722        }
723        return subscription;
724    }
725
726    ActiveMQDestination createTemporaryDestination(final Link link, Symbol[] capabilities) {
727        ActiveMQDestination rc = null;
728        if (contains(capabilities, TEMP_TOPIC_CAPABILITY)) {
729            rc = new ActiveMQTempTopic(connectionId, nextTempDestinationId++);
730        } else if (contains(capabilities, TEMP_QUEUE_CAPABILITY)) {
731            rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
732        } else {
733            LOG.debug("Dynamic link request with no type capability, defaults to Temporary Queue");
734            rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
735        }
736
737        DestinationInfo info = new DestinationInfo();
738        info.setConnectionId(connectionId);
739        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
740        info.setDestination(rc);
741
742        sendToActiveMQ(info, new ResponseHandler() {
743
744            @Override
745            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
746                if (response.isException()) {
747                    link.setSource(null);
748
749                    Throwable exception = ((ExceptionResponse) response).getException();
750                    if (exception instanceof SecurityException) {
751                        link.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
752                    } else {
753                        link.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
754                    }
755
756                    link.close();
757                    link.free();
758                }
759            }
760        });
761
762        return rc;
763    }
764
765    void deleteTemporaryDestination(ActiveMQTempDestination destination) {
766        DestinationInfo info = new DestinationInfo();
767        info.setConnectionId(connectionId);
768        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
769        info.setDestination(destination);
770
771        sendToActiveMQ(info, new ResponseHandler() {
772
773            @Override
774            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
775                if (response.isException()) {
776                    Throwable exception = ((ExceptionResponse) response).getException();
777                    LOG.debug("Error during temp destination removeal: {}", exception.getMessage());
778                }
779            }
780        });
781    }
782
783    void sendToActiveMQ(Command command) {
784        sendToActiveMQ(command, null);
785    }
786
787    void sendToActiveMQ(Command command, ResponseHandler handler) {
788        command.setCommandId(lastCommandId.incrementAndGet());
789        if (handler != null) {
790            command.setResponseRequired(true);
791            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
792        }
793        amqpTransport.sendToActiveMQ(command);
794    }
795
796    void handleException(Throwable exception) {
797        LOG.debug("Exception detail", exception);
798        if (exception instanceof AmqpProtocolException) {
799            onAMQPException((IOException) exception);
800        } else {
801            try {
802                // Must ensure that the broker removes Connection resources.
803                sendToActiveMQ(new ShutdownInfo());
804                amqpTransport.stop();
805            } catch (Throwable e) {
806                LOG.error("Failed to stop AMQP Transport ", e);
807            }
808        }
809    }
810
811    //----- Internal implementation ------------------------------------------//
812
813    private SessionId getNextSessionId() {
814        return new SessionId(connectionId, nextSessionId++);
815    }
816
817    private void stopConnectionTimeoutChecker() {
818        AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
819        if (monitor != null) {
820            monitor.stopConnectionTimeoutChecker();
821        }
822    }
823
824    private void configureInactivityMonitor() {
825        AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
826        if (monitor == null) {
827            return;
828        }
829
830        // If either end has idle timeout requirements then the tick method
831        // will give us a deadline on the next time we need to tick() in order
832        // to meet those obligations.
833        // Using nano time since it is not related to the wall clock, which may change
834        long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
835        long nextIdleCheck = protonTransport.tick(now);
836        if (nextIdleCheck > 0) {
837            long delay = nextIdleCheck - now;
838            LOG.trace("Connection keep-alive processing starts in: {}", delay);
839            monitor.startKeepAliveTask(delay);
840        } else {
841            LOG.trace("Connection does not require keep-alive processing");
842        }
843    }
844}