001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.protocol; 018 019import static org.apache.activemq.transport.amqp.AmqpSupport.ANONYMOUS_RELAY; 020import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED; 021import static org.apache.activemq.transport.amqp.AmqpSupport.CONTAINER_ID; 022import static org.apache.activemq.transport.amqp.AmqpSupport.INVALID_FIELD; 023import static org.apache.activemq.transport.amqp.AmqpSupport.PLATFORM; 024import static org.apache.activemq.transport.amqp.AmqpSupport.PRODUCT; 025import static org.apache.activemq.transport.amqp.AmqpSupport.QUEUE_PREFIX; 026import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY; 027import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY; 028import static org.apache.activemq.transport.amqp.AmqpSupport.TOPIC_PREFIX; 029import static org.apache.activemq.transport.amqp.AmqpSupport.VERSION; 030import static org.apache.activemq.transport.amqp.AmqpSupport.contains; 031 032import java.io.BufferedReader; 033import java.io.IOException; 034import java.io.InputStream; 035import java.io.InputStreamReader; 036import java.nio.ByteBuffer; 037import java.util.HashMap; 038import java.util.Map; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentMap; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicInteger; 043 044import javax.jms.InvalidClientIDException; 045 046import org.apache.activemq.broker.BrokerService; 047import org.apache.activemq.broker.region.DurableTopicSubscription; 048import org.apache.activemq.broker.region.RegionBroker; 049import org.apache.activemq.broker.region.TopicRegion; 050import org.apache.activemq.command.ActiveMQDestination; 051import org.apache.activemq.command.ActiveMQTempDestination; 052import org.apache.activemq.command.ActiveMQTempQueue; 053import org.apache.activemq.command.ActiveMQTempTopic; 054import org.apache.activemq.command.Command; 055import org.apache.activemq.command.ConnectionError; 056import org.apache.activemq.command.ConnectionId; 057import org.apache.activemq.command.ConnectionInfo; 058import org.apache.activemq.command.ConsumerControl; 059import org.apache.activemq.command.ConsumerId; 060import org.apache.activemq.command.ConsumerInfo; 061import org.apache.activemq.command.DestinationInfo; 062import org.apache.activemq.command.ExceptionResponse; 063import org.apache.activemq.command.LocalTransactionId; 064import org.apache.activemq.command.MessageDispatch; 065import org.apache.activemq.command.RemoveInfo; 066import org.apache.activemq.command.Response; 067import org.apache.activemq.command.SessionId; 068import org.apache.activemq.command.ShutdownInfo; 069import org.apache.activemq.command.TransactionId; 070import org.apache.activemq.transport.InactivityIOException; 071import org.apache.activemq.transport.amqp.AmqpHeader; 072import org.apache.activemq.transport.amqp.AmqpInactivityMonitor; 073import org.apache.activemq.transport.amqp.AmqpProtocolConverter; 074import org.apache.activemq.transport.amqp.AmqpProtocolException; 075import org.apache.activemq.transport.amqp.AmqpTransport; 076import org.apache.activemq.transport.amqp.AmqpTransportFilter; 077import org.apache.activemq.transport.amqp.AmqpWireFormat; 078import org.apache.activemq.transport.amqp.ResponseHandler; 079import org.apache.activemq.transport.amqp.sasl.AmqpAuthenticator; 080import org.apache.activemq.util.IOExceptionSupport; 081import org.apache.activemq.util.IdGenerator; 082import org.apache.qpid.proton.Proton; 083import org.apache.qpid.proton.amqp.Symbol; 084import org.apache.qpid.proton.amqp.transaction.Coordinator; 085import org.apache.qpid.proton.amqp.transport.AmqpError; 086import org.apache.qpid.proton.amqp.transport.ErrorCondition; 087import org.apache.qpid.proton.engine.Collector; 088import org.apache.qpid.proton.engine.Connection; 089import org.apache.qpid.proton.engine.Delivery; 090import org.apache.qpid.proton.engine.EndpointState; 091import org.apache.qpid.proton.engine.Event; 092import org.apache.qpid.proton.engine.Link; 093import org.apache.qpid.proton.engine.Receiver; 094import org.apache.qpid.proton.engine.Sender; 095import org.apache.qpid.proton.engine.Session; 096import org.apache.qpid.proton.engine.Transport; 097import org.apache.qpid.proton.engine.impl.CollectorImpl; 098import org.apache.qpid.proton.engine.impl.ProtocolTracer; 099import org.apache.qpid.proton.engine.impl.TransportImpl; 100import org.apache.qpid.proton.framing.TransportFrame; 101import org.fusesource.hawtbuf.Buffer; 102import org.slf4j.Logger; 103import org.slf4j.LoggerFactory; 104 105/** 106 * Implements the mechanics of managing a single remote peer connection. 107 */ 108public class AmqpConnection implements AmqpProtocolConverter { 109 110 private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES; 111 private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class); 112 private static final int CHANNEL_MAX = 32767; 113 private static final String BROKER_VERSION; 114 private static final String BROKER_PLATFORM; 115 116 static { 117 String javaVersion = System.getProperty("java.version"); 118 119 BROKER_PLATFORM = "Java/" + (javaVersion == null ? "unknown" : javaVersion); 120 121 InputStream in = null; 122 String version = "5.12.0"; 123 if ((in = AmqpConnection.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) { 124 BufferedReader reader = new BufferedReader(new InputStreamReader(in)); 125 try { 126 version = reader.readLine(); 127 } catch(Exception e) { 128 } 129 } 130 BROKER_VERSION = version; 131 } 132 133 private final Transport protonTransport = Proton.transport(); 134 private final Connection protonConnection = Proton.connection(); 135 private final Collector eventCollector = new CollectorImpl(); 136 137 private final AmqpTransport amqpTransport; 138 private final AmqpWireFormat amqpWireFormat; 139 private final BrokerService brokerService; 140 141 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 142 private final AtomicInteger lastCommandId = new AtomicInteger(); 143 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 144 private final ConnectionInfo connectionInfo = new ConnectionInfo(); 145 private long nextSessionId; 146 private long nextTempDestinationId; 147 private long nextTransactionId; 148 private boolean closing; 149 private boolean closedSocket; 150 private AmqpAuthenticator authenticator; 151 152 private final Map<TransactionId, AmqpTransactionCoordinator> transactions = new HashMap<TransactionId, AmqpTransactionCoordinator>(); 153 private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); 154 private final ConcurrentMap<ConsumerId, AmqpSender> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, AmqpSender>(); 155 156 public AmqpConnection(AmqpTransport transport, BrokerService brokerService) { 157 this.amqpTransport = transport; 158 159 AmqpInactivityMonitor monitor = transport.getInactivityMonitor(); 160 if (monitor != null) { 161 monitor.setAmqpTransport(amqpTransport); 162 } 163 164 this.amqpWireFormat = transport.getWireFormat(); 165 this.brokerService = brokerService; 166 167 // the configured maxFrameSize on the URI. 168 int maxFrameSize = amqpWireFormat.getMaxAmqpFrameSize(); 169 if (maxFrameSize > AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE) { 170 this.protonTransport.setMaxFrameSize(maxFrameSize); 171 } 172 173 this.protonTransport.bind(this.protonConnection); 174 this.protonTransport.setChannelMax(CHANNEL_MAX); 175 this.protonTransport.setEmitFlowEventOnSend(false); 176 177 this.protonConnection.collect(eventCollector); 178 179 updateTracer(); 180 } 181 182 /** 183 * Load and return a <code>[]Symbol</code> that contains the connection capabilities 184 * offered to new connections 185 * 186 * @return the capabilities that are offered to new clients on connect. 187 */ 188 protected Symbol[] getConnectionCapabilitiesOffered() { 189 return new Symbol[]{ ANONYMOUS_RELAY }; 190 } 191 192 /** 193 * Load and return a <code>Map<Symbol, Object></code> that contains the properties 194 * that this connection supplies to incoming connections. 195 * 196 * @return the properties that are offered to the incoming connection. 197 */ 198 protected Map<Symbol, Object> getConnetionProperties() { 199 Map<Symbol, Object> properties = new HashMap<Symbol, Object>(); 200 201 properties.put(QUEUE_PREFIX, "queue://"); 202 properties.put(TOPIC_PREFIX, "topic://"); 203 properties.put(PRODUCT, "ActiveMQ"); 204 properties.put(VERSION, BROKER_VERSION); 205 properties.put(PLATFORM, BROKER_PLATFORM); 206 207 return properties; 208 } 209 210 /** 211 * Load and return a <code>Map<Symbol, Object></code> that contains the properties 212 * that this connection supplies to incoming connections when the open has failed 213 * and the remote should expect a close to follow. 214 * 215 * @return the properties that are offered to the incoming connection. 216 */ 217 protected Map<Symbol, Object> getFailedConnetionProperties() { 218 Map<Symbol, Object> properties = new HashMap<Symbol, Object>(); 219 220 properties.put(CONNECTION_OPEN_FAILED, true); 221 222 return properties; 223 } 224 225 @Override 226 public void updateTracer() { 227 if (amqpTransport.isTrace()) { 228 ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() { 229 @Override 230 public void receivedFrame(TransportFrame transportFrame) { 231 TRACE_FRAMES.trace("{} | RECV: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody()); 232 } 233 234 @Override 235 public void sentFrame(TransportFrame transportFrame) { 236 TRACE_FRAMES.trace("{} | SENT: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody()); 237 } 238 }); 239 } 240 } 241 242 @Override 243 public long keepAlive() throws IOException { 244 long rescheduleAt = 0l; 245 246 LOG.trace("Performing connection:{} keep-alive processing", amqpTransport.getRemoteAddress()); 247 248 if (protonConnection.getLocalState() != EndpointState.CLOSED) { 249 // Using nano time since it is not related to the wall clock, which may change 250 long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); 251 rescheduleAt = protonTransport.tick(now) - now; 252 pumpProtonToSocket(); 253 if (protonTransport.isClosed()) { 254 rescheduleAt = 0; 255 LOG.debug("Transport closed after inactivity check."); 256 throw new InactivityIOException("Channel was inactive for to long"); 257 } 258 } 259 260 LOG.trace("Connection:{} keep alive processing done, next update in {} milliseconds.", 261 amqpTransport.getRemoteAddress(), rescheduleAt); 262 263 return rescheduleAt; 264 } 265 266 //----- Connection Properties Accessors ----------------------------------// 267 268 /** 269 * @return the amount of credit assigned to AMQP receiver links created from 270 * sender links on the remote peer. 271 */ 272 public int getConfiguredReceiverCredit() { 273 return amqpWireFormat.getProducerCredit(); 274 } 275 276 /** 277 * @return the transformer type that was configured for this AMQP transport. 278 */ 279 public String getConfiguredTransformer() { 280 return amqpWireFormat.getTransformer(); 281 } 282 283 /** 284 * @return the ActiveMQ ConnectionId that identifies this AMQP Connection. 285 */ 286 public ConnectionId getConnectionId() { 287 return connectionId; 288 } 289 290 /** 291 * @return the Client ID used to create the connection with ActiveMQ 292 */ 293 public String getClientId() { 294 return connectionInfo.getClientId(); 295 } 296 297 /** 298 * @return the configured max frame size allowed for incoming messages. 299 */ 300 public long getMaxFrameSize() { 301 return amqpWireFormat.getMaxFrameSize(); 302 } 303 304 //----- Proton Event handling and IO support -----------------------------// 305 306 void pumpProtonToSocket() { 307 try { 308 boolean done = false; 309 while (!done) { 310 ByteBuffer toWrite = protonTransport.getOutputBuffer(); 311 if (toWrite != null && toWrite.hasRemaining()) { 312 LOG.trace("Sending {} bytes out", toWrite.limit()); 313 amqpTransport.sendToAmqp(toWrite); 314 protonTransport.outputConsumed(); 315 } else { 316 done = true; 317 } 318 } 319 } catch (IOException e) { 320 amqpTransport.onException(e); 321 } 322 } 323 324 @Override 325 public void onAMQPData(Object command) throws Exception { 326 Buffer frame; 327 if (command.getClass() == AmqpHeader.class) { 328 AmqpHeader header = (AmqpHeader) command; 329 330 if (amqpWireFormat.isHeaderValid(header)) { 331 LOG.trace("Connection from an AMQP v1.0 client initiated. {}", header); 332 } else { 333 LOG.warn("Connection attempt from non AMQP v1.0 client. {}", header); 334 AmqpHeader reply = amqpWireFormat.getMinimallySupportedHeader(); 335 amqpTransport.sendToAmqp(reply.getBuffer()); 336 handleException(new AmqpProtocolException( 337 "Connection from client using unsupported AMQP attempted", true)); 338 } 339 340 switch (header.getProtocolId()) { 341 case 0: 342 authenticator = null; 343 break; // nothing to do.. 344 case 3: // Client will be using SASL for auth.. 345 authenticator = new AmqpAuthenticator(amqpTransport, protonTransport.sasl(), brokerService); 346 break; 347 default: 348 } 349 frame = header.getBuffer(); 350 } else { 351 frame = (Buffer) command; 352 } 353 354 if (protonTransport.isClosed()) { 355 LOG.debug("Ignoring incoming AMQP data, transport is closed."); 356 return; 357 } 358 359 while (frame.length > 0) { 360 try { 361 int count = protonTransport.input(frame.data, frame.offset, frame.length); 362 frame.moveHead(count); 363 } catch (Throwable e) { 364 handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e)); 365 return; 366 } 367 368 if (authenticator != null) { 369 processSaslExchange(); 370 } else { 371 processProtonEvents(); 372 } 373 } 374 } 375 376 private void processSaslExchange() throws Exception { 377 authenticator.processSaslExchange(connectionInfo); 378 if (authenticator.isDone()) { 379 amqpTransport.getWireFormat().resetMagicRead(); 380 } 381 pumpProtonToSocket(); 382 } 383 384 private void processProtonEvents() throws Exception { 385 try { 386 Event event = null; 387 while ((event = eventCollector.peek()) != null) { 388 if (amqpTransport.isTrace()) { 389 LOG.trace("Processing event: {}", event.getType()); 390 } 391 switch (event.getType()) { 392 case CONNECTION_REMOTE_OPEN: 393 processConnectionOpen(event.getConnection()); 394 break; 395 case CONNECTION_REMOTE_CLOSE: 396 processConnectionClose(event.getConnection()); 397 break; 398 case SESSION_REMOTE_OPEN: 399 processSessionOpen(event.getSession()); 400 break; 401 case SESSION_REMOTE_CLOSE: 402 processSessionClose(event.getSession()); 403 break; 404 case LINK_REMOTE_OPEN: 405 processLinkOpen(event.getLink()); 406 break; 407 case LINK_REMOTE_DETACH: 408 processLinkDetach(event.getLink()); 409 break; 410 case LINK_REMOTE_CLOSE: 411 processLinkClose(event.getLink()); 412 break; 413 case LINK_FLOW: 414 processLinkFlow(event.getLink()); 415 break; 416 case DELIVERY: 417 processDelivery(event.getDelivery()); 418 break; 419 default: 420 break; 421 } 422 423 eventCollector.pop(); 424 } 425 426 } catch (Throwable e) { 427 handleException(new AmqpProtocolException("Could not process AMQP commands", true, e)); 428 } 429 430 pumpProtonToSocket(); 431 } 432 433 protected void processConnectionOpen(Connection connection) throws Exception { 434 435 stopConnectionTimeoutChecker(); 436 437 connectionInfo.setResponseRequired(true); 438 connectionInfo.setConnectionId(connectionId); 439 440 String clientId = protonConnection.getRemoteContainer(); 441 if (clientId != null && !clientId.isEmpty()) { 442 connectionInfo.setClientId(clientId); 443 } 444 445 connectionInfo.setTransportContext(amqpTransport.getPeerCertificates()); 446 447 if (connection.getTransport().getRemoteIdleTimeout() > 0 && !amqpTransport.isUseInactivityMonitor()) { 448 // We cannot meet the requested Idle processing because the inactivity monitor is 449 // disabled so we won't send idle frames to match the request. 450 protonConnection.setProperties(getFailedConnetionProperties()); 451 protonConnection.open(); 452 protonConnection.setCondition(new ErrorCondition(AmqpError.PRECONDITION_FAILED, "Cannot send idle frames")); 453 protonConnection.close(); 454 pumpProtonToSocket(); 455 456 amqpTransport.onException(new IOException( 457 "Connection failed, remote requested idle processing but inactivity monitoring is disbaled.")); 458 return; 459 } 460 461 sendToActiveMQ(connectionInfo, new ResponseHandler() { 462 @Override 463 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 464 Throwable exception = null; 465 try { 466 if (response.isException()) { 467 protonConnection.setProperties(getFailedConnetionProperties()); 468 protonConnection.open(); 469 470 exception = ((ExceptionResponse) response).getException(); 471 if (exception instanceof SecurityException) { 472 protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage())); 473 } else if (exception instanceof InvalidClientIDException) { 474 ErrorCondition condition = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage()); 475 476 Map<Symbol, Object> infoMap = new HashMap<Symbol, Object> (); 477 infoMap.put(INVALID_FIELD, CONTAINER_ID); 478 condition.setInfo(infoMap); 479 480 protonConnection.setCondition(condition); 481 } else { 482 protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, exception.getMessage())); 483 } 484 485 protonConnection.close(); 486 } else { 487 488 if (amqpTransport.isUseInactivityMonitor() && amqpWireFormat.getIdleTimeout() > 0) { 489 LOG.trace("Connection requesting Idle timeout of: {} mills", amqpWireFormat.getIdleTimeout()); 490 protonTransport.setIdleTimeout(amqpWireFormat.getIdleTimeout()); 491 } 492 493 protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); 494 protonConnection.setProperties(getConnetionProperties()); 495 protonConnection.setContainer(brokerService.getBrokerName()); 496 protonConnection.open(); 497 498 configureInactivityMonitor(); 499 } 500 } finally { 501 pumpProtonToSocket(); 502 503 if (response.isException()) { 504 amqpTransport.onException(IOExceptionSupport.create(exception)); 505 } 506 } 507 } 508 }); 509 } 510 511 protected void processConnectionClose(Connection connection) throws Exception { 512 if (!closing) { 513 closing = true; 514 sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() { 515 @Override 516 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 517 protonConnection.close(); 518 protonConnection.free(); 519 520 if (!closedSocket) { 521 pumpProtonToSocket(); 522 } 523 } 524 }); 525 526 sendToActiveMQ(new ShutdownInfo()); 527 } 528 } 529 530 protected void processSessionOpen(Session protonSession) throws Exception { 531 new AmqpSession(this, getNextSessionId(), protonSession).open(); 532 } 533 534 protected void processSessionClose(Session protonSession) throws Exception { 535 if (protonSession.getContext() != null) { 536 ((AmqpResource) protonSession.getContext()).close(); 537 } else { 538 protonSession.close(); 539 protonSession.free(); 540 } 541 } 542 543 protected void processLinkOpen(Link link) throws Exception { 544 link.setSource(link.getRemoteSource()); 545 link.setTarget(link.getRemoteTarget()); 546 547 AmqpSession session = (AmqpSession) link.getSession().getContext(); 548 if (link instanceof Receiver) { 549 if (link.getRemoteTarget() instanceof Coordinator) { 550 session.createCoordinator((Receiver) link); 551 } else { 552 session.createReceiver((Receiver) link); 553 } 554 } else { 555 session.createSender((Sender) link); 556 } 557 } 558 559 protected void processLinkDetach(Link link) throws Exception { 560 Object context = link.getContext(); 561 562 if (context instanceof AmqpLink) { 563 ((AmqpLink) context).detach(); 564 } else { 565 link.detach(); 566 link.free(); 567 } 568 } 569 570 protected void processLinkClose(Link link) throws Exception { 571 Object context = link.getContext(); 572 573 if (context instanceof AmqpLink) { 574 ((AmqpLink) context).close();; 575 } else { 576 link.close(); 577 link.free(); 578 } 579 } 580 581 protected void processLinkFlow(Link link) throws Exception { 582 Object context = link.getContext(); 583 if (context instanceof AmqpLink) { 584 ((AmqpLink) context).flow(); 585 } 586 } 587 588 protected void processDelivery(Delivery delivery) throws Exception { 589 if (!delivery.isPartial()) { 590 Object context = delivery.getLink().getContext(); 591 if (context instanceof AmqpLink) { 592 AmqpLink amqpLink = (AmqpLink) context; 593 amqpLink.delivery(delivery); 594 } 595 } 596 } 597 598 //----- Event entry points for ActiveMQ commands and errors --------------// 599 600 @Override 601 public void onAMQPException(IOException error) { 602 closedSocket = true; 603 if (!closing) { 604 try { 605 closing = true; 606 // Attempt to inform the other end that we are going to close 607 // so that the client doesn't wait around forever. 608 protonConnection.setCondition(new ErrorCondition(AmqpError.DECODE_ERROR, error.getMessage())); 609 protonConnection.close(); 610 pumpProtonToSocket(); 611 } catch (Exception ignore) { 612 } 613 amqpTransport.sendToActiveMQ(error); 614 } else { 615 try { 616 amqpTransport.stop(); 617 } catch (Exception ignore) { 618 } 619 } 620 } 621 622 @Override 623 public void onActiveMQCommand(Command command) throws Exception { 624 if (command.isResponse()) { 625 Response response = (Response) command; 626 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); 627 if (rh != null) { 628 rh.onResponse(this, response); 629 } else { 630 // Pass down any unexpected errors. Should this close the connection? 631 if (response.isException()) { 632 Throwable exception = ((ExceptionResponse) response).getException(); 633 handleException(exception); 634 } 635 } 636 } else if (command.isMessageDispatch()) { 637 MessageDispatch dispatch = (MessageDispatch) command; 638 AmqpSender sender = subscriptionsByConsumerId.get(dispatch.getConsumerId()); 639 if (sender != null) { 640 // End of Queue Browse will have no Message object. 641 if (dispatch.getMessage() != null) { 642 LOG.trace("Dispatching MessageId: {} to consumer", dispatch.getMessage().getMessageId()); 643 } else { 644 LOG.trace("Dispatching End of Browse Command to consumer {}", dispatch.getConsumerId()); 645 } 646 sender.onMessageDispatch(dispatch); 647 if (dispatch.getMessage() != null) { 648 LOG.trace("Finished Dispatch of MessageId: {} to consumer", dispatch.getMessage().getMessageId()); 649 } 650 } 651 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { 652 // Pass down any unexpected async errors. Should this close the connection? 653 Throwable exception = ((ConnectionError) command).getException(); 654 handleException(exception); 655 } else if (command.isConsumerControl()) { 656 ConsumerControl control = (ConsumerControl) command; 657 AmqpSender sender = subscriptionsByConsumerId.get(control.getConsumerId()); 658 if (sender != null) { 659 sender.onConsumerControl(control); 660 } 661 } else if (command.isBrokerInfo()) { 662 // ignore 663 } else { 664 LOG.debug("Do not know how to process ActiveMQ Command {}", command); 665 } 666 } 667 668 //----- Utility methods for connection resources to use ------------------// 669 670 void registerSender(ConsumerId consumerId, AmqpSender sender) { 671 subscriptionsByConsumerId.put(consumerId, sender); 672 } 673 674 void unregisterSender(ConsumerId consumerId) { 675 subscriptionsByConsumerId.remove(consumerId); 676 } 677 678 void registerTransaction(TransactionId txId, AmqpTransactionCoordinator coordinator) { 679 transactions.put(txId, coordinator); 680 } 681 682 void unregisterTransaction(TransactionId txId) { 683 transactions.remove(txId); 684 } 685 686 AmqpTransactionCoordinator getTxCoordinator(TransactionId txId) { 687 return transactions.get(txId); 688 } 689 690 LocalTransactionId getNextTransactionId() { 691 return new LocalTransactionId(getConnectionId(), ++nextTransactionId); 692 } 693 694 ConsumerInfo lookupSubscription(String subscriptionName) throws AmqpProtocolException { 695 ConsumerInfo result = null; 696 RegionBroker regionBroker; 697 698 try { 699 regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class); 700 } catch (Exception e) { 701 throw new AmqpProtocolException("Error finding subscription: " + subscriptionName + ": " + e.getMessage(), false, e); 702 } 703 704 final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); 705 DurableTopicSubscription subscription = topicRegion.lookupSubscription(subscriptionName, connectionInfo.getClientId()); 706 if (subscription != null) { 707 result = subscription.getConsumerInfo(); 708 } 709 710 return result; 711 } 712 713 ActiveMQDestination createTemporaryDestination(final Link link, Symbol[] capabilities) { 714 ActiveMQDestination rc = null; 715 if (contains(capabilities, TEMP_TOPIC_CAPABILITY)) { 716 rc = new ActiveMQTempTopic(connectionId, nextTempDestinationId++); 717 } else if (contains(capabilities, TEMP_QUEUE_CAPABILITY)) { 718 rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++); 719 } else { 720 LOG.debug("Dynamic link request with no type capability, defaults to Temporary Queue"); 721 rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++); 722 } 723 724 DestinationInfo info = new DestinationInfo(); 725 info.setConnectionId(connectionId); 726 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); 727 info.setDestination(rc); 728 729 sendToActiveMQ(info, new ResponseHandler() { 730 731 @Override 732 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 733 if (response.isException()) { 734 link.setSource(null); 735 736 Throwable exception = ((ExceptionResponse) response).getException(); 737 if (exception instanceof SecurityException) { 738 link.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage())); 739 } else { 740 link.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage())); 741 } 742 743 link.close(); 744 link.free(); 745 } 746 } 747 }); 748 749 return rc; 750 } 751 752 void deleteTemporaryDestination(ActiveMQTempDestination destination) { 753 DestinationInfo info = new DestinationInfo(); 754 info.setConnectionId(connectionId); 755 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 756 info.setDestination(destination); 757 758 sendToActiveMQ(info, new ResponseHandler() { 759 760 @Override 761 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 762 if (response.isException()) { 763 Throwable exception = ((ExceptionResponse) response).getException(); 764 LOG.debug("Error during temp destination removeal: {}", exception.getMessage()); 765 } 766 } 767 }); 768 } 769 770 void sendToActiveMQ(Command command) { 771 sendToActiveMQ(command, null); 772 } 773 774 void sendToActiveMQ(Command command, ResponseHandler handler) { 775 command.setCommandId(lastCommandId.incrementAndGet()); 776 if (handler != null) { 777 command.setResponseRequired(true); 778 resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); 779 } 780 amqpTransport.sendToActiveMQ(command); 781 } 782 783 void handleException(Throwable exception) { 784 LOG.debug("Exception detail", exception); 785 if (exception instanceof AmqpProtocolException) { 786 onAMQPException((IOException) exception); 787 } else { 788 try { 789 // Must ensure that the broker removes Connection resources. 790 sendToActiveMQ(new ShutdownInfo()); 791 amqpTransport.stop(); 792 } catch (Throwable e) { 793 LOG.error("Failed to stop AMQP Transport ", e); 794 } 795 } 796 } 797 798 //----- Internal implementation ------------------------------------------// 799 800 private SessionId getNextSessionId() { 801 return new SessionId(connectionId, nextSessionId++); 802 } 803 804 private void stopConnectionTimeoutChecker() { 805 AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor(); 806 if (monitor != null) { 807 monitor.stopConnectionTimeoutChecker(); 808 } 809 } 810 811 private void configureInactivityMonitor() { 812 AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor(); 813 if (monitor == null) { 814 return; 815 } 816 817 // If either end has idle timeout requirements then the tick method 818 // will give us a deadline on the next time we need to tick() in order 819 // to meet those obligations. 820 // Using nano time since it is not related to the wall clock, which may change 821 long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); 822 long nextIdleCheck = protonTransport.tick(now); 823 if (nextIdleCheck > 0) { 824 long delay = nextIdleCheck - now; 825 LOG.trace("Connection keep-alive processing starts in: {}", delay); 826 monitor.startKeepAliveTask(delay); 827 } else { 828 LOG.trace("Connection does not require keep-alive processing"); 829 } 830 } 831}