001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.protocol; 018 019import static org.apache.activemq.transport.amqp.AmqpSupport.ANONYMOUS_RELAY; 020import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED; 021import static org.apache.activemq.transport.amqp.AmqpSupport.CONTAINER_ID; 022import static org.apache.activemq.transport.amqp.AmqpSupport.INVALID_FIELD; 023import static org.apache.activemq.transport.amqp.AmqpSupport.PLATFORM; 024import static org.apache.activemq.transport.amqp.AmqpSupport.PRODUCT; 025import static org.apache.activemq.transport.amqp.AmqpSupport.QUEUE_PREFIX; 026import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY; 027import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY; 028import static org.apache.activemq.transport.amqp.AmqpSupport.TOPIC_PREFIX; 029import static org.apache.activemq.transport.amqp.AmqpSupport.VERSION; 030import static org.apache.activemq.transport.amqp.AmqpSupport.contains; 031 032import java.io.BufferedReader; 033import java.io.IOException; 034import java.io.InputStream; 035import java.io.InputStreamReader; 036import java.nio.ByteBuffer; 037import java.util.HashMap; 038import java.util.Map; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentMap; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicInteger; 043 044import javax.jms.InvalidClientIDException; 045 046import org.apache.activemq.broker.BrokerService; 047import org.apache.activemq.broker.region.AbstractRegion; 048import org.apache.activemq.broker.region.DurableTopicSubscription; 049import org.apache.activemq.broker.region.RegionBroker; 050import org.apache.activemq.broker.region.Subscription; 051import org.apache.activemq.broker.region.TopicRegion; 052import org.apache.activemq.command.ActiveMQDestination; 053import org.apache.activemq.command.ActiveMQTempDestination; 054import org.apache.activemq.command.ActiveMQTempQueue; 055import org.apache.activemq.command.ActiveMQTempTopic; 056import org.apache.activemq.command.Command; 057import org.apache.activemq.command.ConnectionError; 058import org.apache.activemq.command.ConnectionId; 059import org.apache.activemq.command.ConnectionInfo; 060import org.apache.activemq.command.ConsumerControl; 061import org.apache.activemq.command.ConsumerId; 062import org.apache.activemq.command.ConsumerInfo; 063import org.apache.activemq.command.DestinationInfo; 064import org.apache.activemq.command.ExceptionResponse; 065import org.apache.activemq.command.LocalTransactionId; 066import org.apache.activemq.command.MessageDispatch; 067import org.apache.activemq.command.RemoveInfo; 068import org.apache.activemq.command.Response; 069import org.apache.activemq.command.SessionId; 070import org.apache.activemq.command.ShutdownInfo; 071import org.apache.activemq.command.TransactionId; 072import org.apache.activemq.transport.InactivityIOException; 073import org.apache.activemq.transport.amqp.AmqpHeader; 074import org.apache.activemq.transport.amqp.AmqpInactivityMonitor; 075import org.apache.activemq.transport.amqp.AmqpProtocolConverter; 076import org.apache.activemq.transport.amqp.AmqpProtocolException; 077import org.apache.activemq.transport.amqp.AmqpTransport; 078import org.apache.activemq.transport.amqp.AmqpTransportFilter; 079import org.apache.activemq.transport.amqp.AmqpWireFormat; 080import org.apache.activemq.transport.amqp.ResponseHandler; 081import org.apache.activemq.transport.amqp.sasl.AmqpAuthenticator; 082import org.apache.activemq.util.IOExceptionSupport; 083import org.apache.activemq.util.IdGenerator; 084import org.apache.qpid.proton.Proton; 085import org.apache.qpid.proton.amqp.Symbol; 086import org.apache.qpid.proton.amqp.transaction.Coordinator; 087import org.apache.qpid.proton.amqp.transport.AmqpError; 088import org.apache.qpid.proton.amqp.transport.ErrorCondition; 089import org.apache.qpid.proton.engine.Collector; 090import org.apache.qpid.proton.engine.Connection; 091import org.apache.qpid.proton.engine.Delivery; 092import org.apache.qpid.proton.engine.EndpointState; 093import org.apache.qpid.proton.engine.Event; 094import org.apache.qpid.proton.engine.Link; 095import org.apache.qpid.proton.engine.Receiver; 096import org.apache.qpid.proton.engine.Sender; 097import org.apache.qpid.proton.engine.Session; 098import org.apache.qpid.proton.engine.Transport; 099import org.apache.qpid.proton.engine.impl.CollectorImpl; 100import org.apache.qpid.proton.engine.impl.ProtocolTracer; 101import org.apache.qpid.proton.engine.impl.TransportImpl; 102import org.apache.qpid.proton.framing.TransportFrame; 103import org.fusesource.hawtbuf.Buffer; 104import org.slf4j.Logger; 105import org.slf4j.LoggerFactory; 106 107/** 108 * Implements the mechanics of managing a single remote peer connection. 109 */ 110public class AmqpConnection implements AmqpProtocolConverter { 111 112 private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES; 113 private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class); 114 private static final int CHANNEL_MAX = 32767; 115 private static final String BROKER_VERSION; 116 private static final String BROKER_PLATFORM; 117 118 static { 119 String javaVersion = System.getProperty("java.version"); 120 121 BROKER_PLATFORM = "Java/" + (javaVersion == null ? "unknown" : javaVersion); 122 123 InputStream in = null; 124 String version = "5.12.0"; 125 if ((in = AmqpConnection.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) { 126 BufferedReader reader = new BufferedReader(new InputStreamReader(in)); 127 try { 128 version = reader.readLine(); 129 } catch(Exception e) { 130 } 131 } 132 BROKER_VERSION = version; 133 } 134 135 private final Transport protonTransport = Proton.transport(); 136 private final Connection protonConnection = Proton.connection(); 137 private final Collector eventCollector = new CollectorImpl(); 138 139 private final AmqpTransport amqpTransport; 140 private final AmqpWireFormat amqpWireFormat; 141 private final BrokerService brokerService; 142 143 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 144 private final AtomicInteger lastCommandId = new AtomicInteger(); 145 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 146 private final ConnectionInfo connectionInfo = new ConnectionInfo(); 147 private long nextSessionId; 148 private long nextTempDestinationId; 149 private long nextTransactionId; 150 private boolean closing; 151 private boolean closedSocket; 152 private AmqpAuthenticator authenticator; 153 154 private final Map<TransactionId, AmqpTransactionCoordinator> transactions = new HashMap<TransactionId, AmqpTransactionCoordinator>(); 155 private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); 156 private final ConcurrentMap<ConsumerId, AmqpSender> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, AmqpSender>(); 157 158 public AmqpConnection(AmqpTransport transport, BrokerService brokerService) { 159 this.amqpTransport = transport; 160 161 AmqpInactivityMonitor monitor = transport.getInactivityMonitor(); 162 if (monitor != null) { 163 monitor.setAmqpTransport(amqpTransport); 164 } 165 166 this.amqpWireFormat = transport.getWireFormat(); 167 this.brokerService = brokerService; 168 169 // the configured maxFrameSize on the URI. 170 int maxFrameSize = amqpWireFormat.getMaxAmqpFrameSize(); 171 if (maxFrameSize > AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE) { 172 this.protonTransport.setMaxFrameSize(maxFrameSize); 173 } 174 175 this.protonTransport.bind(this.protonConnection); 176 this.protonTransport.setChannelMax(CHANNEL_MAX); 177 this.protonTransport.setEmitFlowEventOnSend(false); 178 179 this.protonConnection.collect(eventCollector); 180 181 updateTracer(); 182 } 183 184 /** 185 * Load and return a <code>[]Symbol</code> that contains the connection capabilities 186 * offered to new connections 187 * 188 * @return the capabilities that are offered to new clients on connect. 189 */ 190 protected Symbol[] getConnectionCapabilitiesOffered() { 191 return new Symbol[]{ ANONYMOUS_RELAY }; 192 } 193 194 /** 195 * Load and return a <code>Map<Symbol, Object></code> that contains the properties 196 * that this connection supplies to incoming connections. 197 * 198 * @return the properties that are offered to the incoming connection. 199 */ 200 protected Map<Symbol, Object> getConnetionProperties() { 201 Map<Symbol, Object> properties = new HashMap<Symbol, Object>(); 202 203 properties.put(QUEUE_PREFIX, "queue://"); 204 properties.put(TOPIC_PREFIX, "topic://"); 205 properties.put(PRODUCT, "ActiveMQ"); 206 properties.put(VERSION, BROKER_VERSION); 207 properties.put(PLATFORM, BROKER_PLATFORM); 208 209 return properties; 210 } 211 212 /** 213 * Load and return a <code>Map<Symbol, Object></code> that contains the properties 214 * that this connection supplies to incoming connections when the open has failed 215 * and the remote should expect a close to follow. 216 * 217 * @return the properties that are offered to the incoming connection. 218 */ 219 protected Map<Symbol, Object> getFailedConnetionProperties() { 220 Map<Symbol, Object> properties = new HashMap<Symbol, Object>(); 221 222 properties.put(CONNECTION_OPEN_FAILED, true); 223 224 return properties; 225 } 226 227 @Override 228 public void updateTracer() { 229 if (amqpTransport.isTrace()) { 230 ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() { 231 @Override 232 public void receivedFrame(TransportFrame transportFrame) { 233 TRACE_FRAMES.trace("{} | RECV: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody()); 234 } 235 236 @Override 237 public void sentFrame(TransportFrame transportFrame) { 238 TRACE_FRAMES.trace("{} | SENT: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody()); 239 } 240 }); 241 } 242 } 243 244 @Override 245 public long keepAlive() throws IOException { 246 long rescheduleAt = 0l; 247 248 LOG.trace("Performing connection:{} keep-alive processing", amqpTransport.getRemoteAddress()); 249 250 if (protonConnection.getLocalState() != EndpointState.CLOSED) { 251 // Using nano time since it is not related to the wall clock, which may change 252 long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); 253 rescheduleAt = protonTransport.tick(now) - now; 254 pumpProtonToSocket(); 255 if (protonTransport.isClosed()) { 256 rescheduleAt = 0; 257 LOG.debug("Transport closed after inactivity check."); 258 throw new InactivityIOException("Channel was inactive for to long"); 259 } 260 } 261 262 LOG.trace("Connection:{} keep alive processing done, next update in {} milliseconds.", 263 amqpTransport.getRemoteAddress(), rescheduleAt); 264 265 return rescheduleAt; 266 } 267 268 //----- Connection Properties Accessors ----------------------------------// 269 270 /** 271 * @return the amount of credit assigned to AMQP receiver links created from 272 * sender links on the remote peer. 273 */ 274 public int getConfiguredReceiverCredit() { 275 return amqpWireFormat.getProducerCredit(); 276 } 277 278 /** 279 * @return the transformer type that was configured for this AMQP transport. 280 */ 281 public String getConfiguredTransformer() { 282 return amqpWireFormat.getTransformer(); 283 } 284 285 /** 286 * @return the ActiveMQ ConnectionId that identifies this AMQP Connection. 287 */ 288 public ConnectionId getConnectionId() { 289 return connectionId; 290 } 291 292 /** 293 * @return the Client ID used to create the connection with ActiveMQ 294 */ 295 public String getClientId() { 296 return connectionInfo.getClientId(); 297 } 298 299 /** 300 * @return the configured max frame size allowed for incoming messages. 301 */ 302 public long getMaxFrameSize() { 303 return amqpWireFormat.getMaxFrameSize(); 304 } 305 306 //----- Proton Event handling and IO support -----------------------------// 307 308 void pumpProtonToSocket() { 309 try { 310 boolean done = false; 311 while (!done) { 312 ByteBuffer toWrite = protonTransport.getOutputBuffer(); 313 if (toWrite != null && toWrite.hasRemaining()) { 314 LOG.trace("Sending {} bytes out", toWrite.limit()); 315 amqpTransport.sendToAmqp(toWrite); 316 protonTransport.outputConsumed(); 317 } else { 318 done = true; 319 } 320 } 321 } catch (IOException e) { 322 amqpTransport.onException(e); 323 } 324 } 325 326 @Override 327 public void onAMQPData(Object command) throws Exception { 328 Buffer frame; 329 if (command.getClass() == AmqpHeader.class) { 330 AmqpHeader header = (AmqpHeader) command; 331 332 if (amqpWireFormat.isHeaderValid(header)) { 333 LOG.trace("Connection from an AMQP v1.0 client initiated. {}", header); 334 } else { 335 LOG.warn("Connection attempt from non AMQP v1.0 client. {}", header); 336 AmqpHeader reply = amqpWireFormat.getMinimallySupportedHeader(); 337 amqpTransport.sendToAmqp(reply.getBuffer()); 338 handleException(new AmqpProtocolException( 339 "Connection from client using unsupported AMQP attempted", true)); 340 } 341 342 switch (header.getProtocolId()) { 343 case 0: 344 authenticator = null; 345 break; // nothing to do.. 346 case 3: // Client will be using SASL for auth.. 347 authenticator = new AmqpAuthenticator(amqpTransport, protonTransport.sasl(), brokerService); 348 break; 349 default: 350 } 351 frame = header.getBuffer(); 352 } else { 353 frame = (Buffer) command; 354 } 355 356 if (protonTransport.isClosed()) { 357 LOG.debug("Ignoring incoming AMQP data, transport is closed."); 358 return; 359 } 360 361 while (frame.length > 0) { 362 try { 363 int count = protonTransport.input(frame.data, frame.offset, frame.length); 364 frame.moveHead(count); 365 } catch (Throwable e) { 366 handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e)); 367 return; 368 } 369 370 if (authenticator != null) { 371 processSaslExchange(); 372 } else { 373 processProtonEvents(); 374 } 375 } 376 } 377 378 private void processSaslExchange() throws Exception { 379 authenticator.processSaslExchange(connectionInfo); 380 if (authenticator.isDone()) { 381 amqpTransport.getWireFormat().resetMagicRead(); 382 } 383 pumpProtonToSocket(); 384 } 385 386 private void processProtonEvents() throws Exception { 387 try { 388 Event event = null; 389 while ((event = eventCollector.peek()) != null) { 390 if (amqpTransport.isTrace()) { 391 LOG.trace("Processing event: {}", event.getType()); 392 } 393 switch (event.getType()) { 394 case CONNECTION_REMOTE_OPEN: 395 processConnectionOpen(event.getConnection()); 396 break; 397 case CONNECTION_REMOTE_CLOSE: 398 processConnectionClose(event.getConnection()); 399 break; 400 case SESSION_REMOTE_OPEN: 401 processSessionOpen(event.getSession()); 402 break; 403 case SESSION_REMOTE_CLOSE: 404 processSessionClose(event.getSession()); 405 break; 406 case LINK_REMOTE_OPEN: 407 processLinkOpen(event.getLink()); 408 break; 409 case LINK_REMOTE_DETACH: 410 processLinkDetach(event.getLink()); 411 break; 412 case LINK_REMOTE_CLOSE: 413 processLinkClose(event.getLink()); 414 break; 415 case LINK_FLOW: 416 processLinkFlow(event.getLink()); 417 break; 418 case DELIVERY: 419 processDelivery(event.getDelivery()); 420 break; 421 default: 422 break; 423 } 424 425 eventCollector.pop(); 426 } 427 428 } catch (Throwable e) { 429 handleException(new AmqpProtocolException("Could not process AMQP commands", true, e)); 430 } 431 432 pumpProtonToSocket(); 433 } 434 435 protected void processConnectionOpen(Connection connection) throws Exception { 436 437 stopConnectionTimeoutChecker(); 438 439 connectionInfo.setResponseRequired(true); 440 connectionInfo.setConnectionId(connectionId); 441 442 String clientId = protonConnection.getRemoteContainer(); 443 if (clientId != null && !clientId.isEmpty()) { 444 connectionInfo.setClientId(clientId); 445 } 446 447 connectionInfo.setTransportContext(amqpTransport.getPeerCertificates()); 448 449 if (connection.getTransport().getRemoteIdleTimeout() > 0 && !amqpTransport.isUseInactivityMonitor()) { 450 // We cannot meet the requested Idle processing because the inactivity monitor is 451 // disabled so we won't send idle frames to match the request. 452 protonConnection.setProperties(getFailedConnetionProperties()); 453 protonConnection.open(); 454 protonConnection.setCondition(new ErrorCondition(AmqpError.PRECONDITION_FAILED, "Cannot send idle frames")); 455 protonConnection.close(); 456 pumpProtonToSocket(); 457 458 amqpTransport.onException(new IOException( 459 "Connection failed, remote requested idle processing but inactivity monitoring is disbaled.")); 460 return; 461 } 462 463 sendToActiveMQ(connectionInfo, new ResponseHandler() { 464 @Override 465 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 466 Throwable exception = null; 467 try { 468 if (response.isException()) { 469 protonConnection.setProperties(getFailedConnetionProperties()); 470 protonConnection.open(); 471 472 exception = ((ExceptionResponse) response).getException(); 473 if (exception instanceof SecurityException) { 474 protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage())); 475 } else if (exception instanceof InvalidClientIDException) { 476 ErrorCondition condition = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage()); 477 478 Map<Symbol, Object> infoMap = new HashMap<Symbol, Object> (); 479 infoMap.put(INVALID_FIELD, CONTAINER_ID); 480 condition.setInfo(infoMap); 481 482 protonConnection.setCondition(condition); 483 } else { 484 protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, exception.getMessage())); 485 } 486 487 protonConnection.close(); 488 } else { 489 490 if (amqpTransport.isUseInactivityMonitor() && amqpWireFormat.getIdleTimeout() > 0) { 491 LOG.trace("Connection requesting Idle timeout of: {} mills", amqpWireFormat.getIdleTimeout()); 492 protonTransport.setIdleTimeout(amqpWireFormat.getIdleTimeout()); 493 } 494 495 protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); 496 protonConnection.setProperties(getConnetionProperties()); 497 protonConnection.setContainer(brokerService.getBrokerName()); 498 protonConnection.open(); 499 500 configureInactivityMonitor(); 501 } 502 } finally { 503 pumpProtonToSocket(); 504 505 if (response.isException()) { 506 amqpTransport.onException(IOExceptionSupport.create(exception)); 507 } 508 } 509 } 510 }); 511 } 512 513 protected void processConnectionClose(Connection connection) throws Exception { 514 if (!closing) { 515 closing = true; 516 sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() { 517 @Override 518 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 519 protonConnection.close(); 520 protonConnection.free(); 521 522 if (!closedSocket) { 523 pumpProtonToSocket(); 524 } 525 } 526 }); 527 528 sendToActiveMQ(new ShutdownInfo()); 529 } 530 } 531 532 protected void processSessionOpen(Session protonSession) throws Exception { 533 new AmqpSession(this, getNextSessionId(), protonSession).open(); 534 } 535 536 protected void processSessionClose(Session protonSession) throws Exception { 537 if (protonSession.getContext() != null) { 538 ((AmqpResource) protonSession.getContext()).close(); 539 } else { 540 protonSession.close(); 541 protonSession.free(); 542 } 543 } 544 545 protected void processLinkOpen(Link link) throws Exception { 546 link.setSource(link.getRemoteSource()); 547 link.setTarget(link.getRemoteTarget()); 548 549 AmqpSession session = (AmqpSession) link.getSession().getContext(); 550 if (link instanceof Receiver) { 551 if (link.getRemoteTarget() instanceof Coordinator) { 552 session.createCoordinator((Receiver) link); 553 } else { 554 session.createReceiver((Receiver) link); 555 } 556 } else { 557 session.createSender((Sender) link); 558 } 559 } 560 561 protected void processLinkDetach(Link link) throws Exception { 562 Object context = link.getContext(); 563 564 if (context instanceof AmqpLink) { 565 ((AmqpLink) context).detach(); 566 } else { 567 link.detach(); 568 link.free(); 569 } 570 } 571 572 protected void processLinkClose(Link link) throws Exception { 573 Object context = link.getContext(); 574 575 if (context instanceof AmqpLink) { 576 ((AmqpLink) context).close();; 577 } else { 578 link.close(); 579 link.free(); 580 } 581 } 582 583 protected void processLinkFlow(Link link) throws Exception { 584 Object context = link.getContext(); 585 if (context instanceof AmqpLink) { 586 ((AmqpLink) context).flow(); 587 } 588 } 589 590 protected void processDelivery(Delivery delivery) throws Exception { 591 if (!delivery.isPartial()) { 592 Object context = delivery.getLink().getContext(); 593 if (context instanceof AmqpLink) { 594 AmqpLink amqpLink = (AmqpLink) context; 595 amqpLink.delivery(delivery); 596 } 597 } 598 } 599 600 //----- Event entry points for ActiveMQ commands and errors --------------// 601 602 @Override 603 public void onAMQPException(IOException error) { 604 closedSocket = true; 605 if (!closing) { 606 try { 607 closing = true; 608 // Attempt to inform the other end that we are going to close 609 // so that the client doesn't wait around forever. 610 protonConnection.setCondition(new ErrorCondition(AmqpError.DECODE_ERROR, error.getMessage())); 611 protonConnection.close(); 612 pumpProtonToSocket(); 613 } catch (Exception ignore) { 614 } 615 amqpTransport.sendToActiveMQ(error); 616 } else { 617 try { 618 amqpTransport.stop(); 619 } catch (Exception ignore) { 620 } 621 } 622 } 623 624 @Override 625 public void onActiveMQCommand(Command command) throws Exception { 626 if (command.isResponse()) { 627 Response response = (Response) command; 628 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); 629 if (rh != null) { 630 rh.onResponse(this, response); 631 } else { 632 // Pass down any unexpected errors. Should this close the connection? 633 if (response.isException()) { 634 Throwable exception = ((ExceptionResponse) response).getException(); 635 handleException(exception); 636 } 637 } 638 } else if (command.isMessageDispatch()) { 639 MessageDispatch dispatch = (MessageDispatch) command; 640 AmqpSender sender = subscriptionsByConsumerId.get(dispatch.getConsumerId()); 641 if (sender != null) { 642 // End of Queue Browse will have no Message object. 643 if (dispatch.getMessage() != null) { 644 LOG.trace("Dispatching MessageId: {} to consumer", dispatch.getMessage().getMessageId()); 645 } else { 646 LOG.trace("Dispatching End of Browse Command to consumer {}", dispatch.getConsumerId()); 647 } 648 sender.onMessageDispatch(dispatch); 649 if (dispatch.getMessage() != null) { 650 LOG.trace("Finished Dispatch of MessageId: {} to consumer", dispatch.getMessage().getMessageId()); 651 } 652 } 653 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { 654 // Pass down any unexpected async errors. Should this close the connection? 655 Throwable exception = ((ConnectionError) command).getException(); 656 handleException(exception); 657 } else if (command.isConsumerControl()) { 658 ConsumerControl control = (ConsumerControl) command; 659 AmqpSender sender = subscriptionsByConsumerId.get(control.getConsumerId()); 660 if (sender != null) { 661 sender.onConsumerControl(control); 662 } 663 } else if (command.isBrokerInfo()) { 664 // ignore 665 } else { 666 LOG.debug("Do not know how to process ActiveMQ Command {}", command); 667 } 668 } 669 670 //----- Utility methods for connection resources to use ------------------// 671 672 void registerSender(ConsumerId consumerId, AmqpSender sender) { 673 subscriptionsByConsumerId.put(consumerId, sender); 674 } 675 676 void unregisterSender(ConsumerId consumerId) { 677 subscriptionsByConsumerId.remove(consumerId); 678 } 679 680 void registerTransaction(TransactionId txId, AmqpTransactionCoordinator coordinator) { 681 transactions.put(txId, coordinator); 682 } 683 684 void unregisterTransaction(TransactionId txId) { 685 transactions.remove(txId); 686 } 687 688 AmqpTransactionCoordinator getTxCoordinator(TransactionId txId) { 689 return transactions.get(txId); 690 } 691 692 LocalTransactionId getNextTransactionId() { 693 return new LocalTransactionId(getConnectionId(), ++nextTransactionId); 694 } 695 696 ConsumerInfo lookupSubscription(String subscriptionName) throws AmqpProtocolException { 697 ConsumerInfo result = null; 698 RegionBroker regionBroker; 699 700 try { 701 regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class); 702 } catch (Exception e) { 703 throw new AmqpProtocolException("Error finding subscription: " + subscriptionName + ": " + e.getMessage(), false, e); 704 } 705 706 final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); 707 DurableTopicSubscription subscription = topicRegion.lookupSubscription(subscriptionName, connectionInfo.getClientId()); 708 if (subscription != null) { 709 result = subscription.getConsumerInfo(); 710 } 711 712 return result; 713 } 714 715 716 Subscription lookupPrefetchSubscription(ConsumerInfo consumerInfo) { 717 Subscription subscription = null; 718 try { 719 subscription = ((AbstractRegion)((RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class)).getRegion(consumerInfo.getDestination())).getSubscriptions().get(consumerInfo.getConsumerId()); 720 } catch (Exception e) { 721 LOG.warn("Error finding subscription for: " + consumerInfo + ": " + e.getMessage(), false, e); 722 } 723 return subscription; 724 } 725 726 ActiveMQDestination createTemporaryDestination(final Link link, Symbol[] capabilities) { 727 ActiveMQDestination rc = null; 728 if (contains(capabilities, TEMP_TOPIC_CAPABILITY)) { 729 rc = new ActiveMQTempTopic(connectionId, nextTempDestinationId++); 730 } else if (contains(capabilities, TEMP_QUEUE_CAPABILITY)) { 731 rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++); 732 } else { 733 LOG.debug("Dynamic link request with no type capability, defaults to Temporary Queue"); 734 rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++); 735 } 736 737 DestinationInfo info = new DestinationInfo(); 738 info.setConnectionId(connectionId); 739 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); 740 info.setDestination(rc); 741 742 sendToActiveMQ(info, new ResponseHandler() { 743 744 @Override 745 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 746 if (response.isException()) { 747 link.setSource(null); 748 749 Throwable exception = ((ExceptionResponse) response).getException(); 750 if (exception instanceof SecurityException) { 751 link.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage())); 752 } else { 753 link.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage())); 754 } 755 756 link.close(); 757 link.free(); 758 } 759 } 760 }); 761 762 return rc; 763 } 764 765 void deleteTemporaryDestination(ActiveMQTempDestination destination) { 766 DestinationInfo info = new DestinationInfo(); 767 info.setConnectionId(connectionId); 768 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 769 info.setDestination(destination); 770 771 sendToActiveMQ(info, new ResponseHandler() { 772 773 @Override 774 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 775 if (response.isException()) { 776 Throwable exception = ((ExceptionResponse) response).getException(); 777 LOG.debug("Error during temp destination removeal: {}", exception.getMessage()); 778 } 779 } 780 }); 781 } 782 783 void sendToActiveMQ(Command command) { 784 sendToActiveMQ(command, null); 785 } 786 787 void sendToActiveMQ(Command command, ResponseHandler handler) { 788 command.setCommandId(lastCommandId.incrementAndGet()); 789 if (handler != null) { 790 command.setResponseRequired(true); 791 resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); 792 } 793 amqpTransport.sendToActiveMQ(command); 794 } 795 796 void handleException(Throwable exception) { 797 LOG.debug("Exception detail", exception); 798 if (exception instanceof AmqpProtocolException) { 799 onAMQPException((IOException) exception); 800 } else { 801 try { 802 // Must ensure that the broker removes Connection resources. 803 sendToActiveMQ(new ShutdownInfo()); 804 amqpTransport.stop(); 805 } catch (Throwable e) { 806 LOG.error("Failed to stop AMQP Transport ", e); 807 } 808 } 809 } 810 811 //----- Internal implementation ------------------------------------------// 812 813 private SessionId getNextSessionId() { 814 return new SessionId(connectionId, nextSessionId++); 815 } 816 817 private void stopConnectionTimeoutChecker() { 818 AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor(); 819 if (monitor != null) { 820 monitor.stopConnectionTimeoutChecker(); 821 } 822 } 823 824 private void configureInactivityMonitor() { 825 AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor(); 826 if (monitor == null) { 827 return; 828 } 829 830 // If either end has idle timeout requirements then the tick method 831 // will give us a deadline on the next time we need to tick() in order 832 // to meet those obligations. 833 // Using nano time since it is not related to the wall clock, which may change 834 long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); 835 long nextIdleCheck = protonTransport.tick(now); 836 if (nextIdleCheck > 0) { 837 long delay = nextIdleCheck - now; 838 LOG.trace("Connection keep-alive processing starts in: {}", delay); 839 monitor.startKeepAliveTask(delay); 840 } else { 841 LOG.trace("Connection does not require keep-alive processing"); 842 } 843 } 844}