001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.stomp; 018 019import java.io.BufferedReader; 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InputStreamReader; 023import java.io.OutputStreamWriter; 024import java.io.PrintWriter; 025import java.util.HashMap; 026import java.util.Iterator; 027import java.util.Map; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import java.util.concurrent.atomic.AtomicBoolean; 031 032import javax.jms.JMSException; 033 034import org.apache.activemq.ActiveMQPrefetchPolicy; 035import org.apache.activemq.advisory.AdvisorySupport; 036import org.apache.activemq.broker.BrokerContext; 037import org.apache.activemq.broker.BrokerContextAware; 038import org.apache.activemq.command.ActiveMQDestination; 039import org.apache.activemq.command.ActiveMQMessage; 040import org.apache.activemq.command.ActiveMQTempQueue; 041import org.apache.activemq.command.ActiveMQTempTopic; 042import org.apache.activemq.command.Command; 043import org.apache.activemq.command.CommandTypes; 044import org.apache.activemq.command.ConnectionError; 045import org.apache.activemq.command.ConnectionId; 046import org.apache.activemq.command.ConnectionInfo; 047import org.apache.activemq.command.ConsumerControl; 048import org.apache.activemq.command.ConsumerId; 049import org.apache.activemq.command.ConsumerInfo; 050import org.apache.activemq.command.DestinationInfo; 051import org.apache.activemq.command.ExceptionResponse; 052import org.apache.activemq.command.LocalTransactionId; 053import org.apache.activemq.command.MessageAck; 054import org.apache.activemq.command.MessageDispatch; 055import org.apache.activemq.command.MessageId; 056import org.apache.activemq.command.ProducerId; 057import org.apache.activemq.command.ProducerInfo; 058import org.apache.activemq.command.RemoveSubscriptionInfo; 059import org.apache.activemq.command.Response; 060import org.apache.activemq.command.SessionId; 061import org.apache.activemq.command.SessionInfo; 062import org.apache.activemq.command.ShutdownInfo; 063import org.apache.activemq.command.TransactionId; 064import org.apache.activemq.command.TransactionInfo; 065import org.apache.activemq.util.ByteArrayOutputStream; 066import org.apache.activemq.util.FactoryFinder; 067import org.apache.activemq.util.IOExceptionSupport; 068import org.apache.activemq.util.IdGenerator; 069import org.apache.activemq.util.IntrospectionSupport; 070import org.apache.activemq.util.LongSequenceGenerator; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074/** 075 * @author <a href="http://hiramchirino.com">chirino</a> 076 */ 077public class ProtocolConverter { 078 079 private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class); 080 081 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 082 083 private static final String BROKER_VERSION; 084 private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE); 085 086 static { 087 InputStream in = null; 088 String version = "5.6.0"; 089 if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) { 090 BufferedReader reader = new BufferedReader(new InputStreamReader(in)); 091 try { 092 version = reader.readLine(); 093 } catch(Exception e) { 094 } 095 } 096 BROKER_VERSION = version; 097 } 098 099 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 100 private final SessionId sessionId = new SessionId(connectionId, -1); 101 private final ProducerId producerId = new ProducerId(sessionId, 1); 102 103 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 104 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 105 private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); 106 private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator(); 107 108 private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); 109 private final ConcurrentMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>(); 110 private final ConcurrentMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>(); 111 private final ConcurrentMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>(); 112 private final ConcurrentMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>(); 113 private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>(); 114 private final StompTransport stompTransport; 115 116 private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<String, AckEntry>(); 117 private final IdGenerator ACK_ID_GENERATOR = new IdGenerator(); 118 119 private final Object commnadIdMutex = new Object(); 120 private int lastCommandId; 121 private final AtomicBoolean connected = new AtomicBoolean(false); 122 private final FrameTranslator frameTranslator = new LegacyFrameTranslator(); 123 private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/"); 124 private final BrokerContext brokerContext; 125 private String version = "1.0"; 126 private long hbReadInterval; 127 private long hbWriteInterval; 128 private float hbGracePeriodMultiplier = 1.0f; 129 private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT; 130 131 private static class AckEntry { 132 133 private final String messageId; 134 private final StompSubscription subscription; 135 136 public AckEntry(String messageId, StompSubscription subscription) { 137 this.messageId = messageId; 138 this.subscription = subscription; 139 } 140 141 public MessageAck onMessageAck(TransactionId transactionId) { 142 return subscription.onStompMessageAck(messageId, transactionId); 143 } 144 145 public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException { 146 return subscription.onStompMessageNack(messageId, transactionId); 147 } 148 149 public String getMessageId() { 150 return this.messageId; 151 } 152 153 @SuppressWarnings("unused") 154 public StompSubscription getSubscription() { 155 return this.subscription; 156 } 157 } 158 159 public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) { 160 this.stompTransport = stompTransport; 161 this.brokerContext = brokerContext; 162 } 163 164 protected int generateCommandId() { 165 synchronized (commnadIdMutex) { 166 return lastCommandId++; 167 } 168 } 169 170 protected ResponseHandler createResponseHandler(final StompFrame command) { 171 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 172 if (receiptId != null) { 173 return new ResponseHandler() { 174 @Override 175 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 176 if (response.isException()) { 177 // Generally a command can fail.. but that does not invalidate the connection. 178 // We report back the failure but we don't close the connection. 179 Throwable exception = ((ExceptionResponse)response).getException(); 180 handleException(exception, command); 181 } else { 182 StompFrame sc = new StompFrame(); 183 sc.setAction(Stomp.Responses.RECEIPT); 184 sc.setHeaders(new HashMap<String, String>(1)); 185 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 186 stompTransport.sendToStomp(sc); 187 } 188 } 189 }; 190 } 191 return null; 192 } 193 194 protected void sendToActiveMQ(Command command, ResponseHandler handler) { 195 command.setCommandId(generateCommandId()); 196 if (handler != null) { 197 command.setResponseRequired(true); 198 resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); 199 } 200 stompTransport.sendToActiveMQ(command); 201 } 202 203 protected void sendToStomp(StompFrame command) throws IOException { 204 stompTransport.sendToStomp(command); 205 } 206 207 protected FrameTranslator findTranslator(String header) { 208 return findTranslator(header, null, false); 209 } 210 211 protected FrameTranslator findTranslator(String header, ActiveMQDestination destination, boolean advisory) { 212 FrameTranslator translator = frameTranslator; 213 try { 214 if (header != null) { 215 translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header); 216 } else { 217 if (destination != null && (advisory || AdvisorySupport.isAdvisoryTopic(destination))) { 218 translator = new JmsFrameTranslator(); 219 } 220 } 221 } catch (Exception ignore) { 222 // if anything goes wrong use the default translator 223 } 224 225 if (translator instanceof BrokerContextAware) { 226 ((BrokerContextAware)translator).setBrokerContext(brokerContext); 227 } 228 229 return translator; 230 } 231 232 /** 233 * Convert a STOMP command 234 * 235 * @param command 236 */ 237 public void onStompCommand(StompFrame command) throws IOException, JMSException { 238 try { 239 240 if (command.getClass() == StompFrameError.class) { 241 throw ((StompFrameError)command).getException(); 242 } 243 244 String action = command.getAction(); 245 if (action.startsWith(Stomp.Commands.SEND)) { 246 onStompSend(command); 247 } else if (action.startsWith(Stomp.Commands.ACK)) { 248 onStompAck(command); 249 } else if (action.startsWith(Stomp.Commands.NACK)) { 250 onStompNack(command); 251 } else if (action.startsWith(Stomp.Commands.BEGIN)) { 252 onStompBegin(command); 253 } else if (action.startsWith(Stomp.Commands.COMMIT)) { 254 onStompCommit(command); 255 } else if (action.startsWith(Stomp.Commands.ABORT)) { 256 onStompAbort(command); 257 } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) { 258 onStompSubscribe(command); 259 } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) { 260 onStompUnsubscribe(command); 261 } else if (action.startsWith(Stomp.Commands.CONNECT) || 262 action.startsWith(Stomp.Commands.STOMP)) { 263 onStompConnect(command); 264 } else if (action.startsWith(Stomp.Commands.DISCONNECT)) { 265 onStompDisconnect(command); 266 } else { 267 throw new ProtocolException("Unknown STOMP action: " + action, true); 268 } 269 270 } catch (ProtocolException e) { 271 handleException(e, command); 272 // Some protocol errors can cause the connection to get closed. 273 if (e.isFatal()) { 274 getStompTransport().onException(e); 275 } 276 } 277 } 278 279 protected void handleException(Throwable exception, StompFrame command) throws IOException { 280 if (command == null) { 281 LOG.warn("Exception occurred while processing a command: {}", exception.toString()); 282 } else { 283 LOG.warn("Exception occurred processing: {} -> {}", safeGetAction(command), exception.toString()); 284 } 285 286 if (LOG.isDebugEnabled()) { 287 LOG.debug("Exception detail", exception); 288 } 289 290 if (command != null && LOG.isTraceEnabled()) { 291 LOG.trace("Command that caused the error: {}", command); 292 } 293 294 // Let the stomp client know about any protocol errors. 295 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 296 PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8")); 297 exception.printStackTrace(stream); 298 stream.close(); 299 300 HashMap<String, String> headers = new HashMap<String, String>(); 301 headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage()); 302 headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain"); 303 304 if (command != null) { 305 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 306 if (receiptId != null) { 307 headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 308 } 309 } 310 311 StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray()); 312 sendToStomp(errorMessage); 313 } 314 315 protected void onStompSend(StompFrame command) throws IOException, JMSException { 316 checkConnected(); 317 318 Map<String, String> headers = command.getHeaders(); 319 String destination = headers.get(Stomp.Headers.Send.DESTINATION); 320 if (destination == null) { 321 throw new ProtocolException("SEND received without a Destination specified!"); 322 } 323 324 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 325 headers.remove("transaction"); 326 327 ActiveMQMessage message = convertMessage(command); 328 329 message.setProducerId(producerId); 330 MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); 331 message.setMessageId(id); 332 333 if (stompTx != null) { 334 TransactionId activemqTx = transactions.get(stompTx); 335 if (activemqTx == null) { 336 throw new ProtocolException("Invalid transaction id: " + stompTx); 337 } 338 message.setTransactionId(activemqTx); 339 } 340 341 message.onSend(); 342 message.beforeMarshall(null); 343 sendToActiveMQ(message, createResponseHandler(command)); 344 } 345 346 protected void onStompNack(StompFrame command) throws ProtocolException { 347 348 checkConnected(); 349 350 if (this.version.equals(Stomp.V1_0)) { 351 throw new ProtocolException("NACK received but connection is in v1.0 mode."); 352 } 353 354 Map<String, String> headers = command.getHeaders(); 355 356 String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION); 357 if (subscriptionId == null && !this.version.equals(Stomp.V1_2)) { 358 throw new ProtocolException("NACK received without a subscription id for acknowledge!"); 359 } 360 361 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID); 362 if (messageId == null && !this.version.equals(Stomp.V1_2)) { 363 throw new ProtocolException("NACK received without a message-id to acknowledge!"); 364 } 365 366 String ackId = headers.get(Stomp.Headers.Ack.ACK_ID); 367 if (ackId == null && this.version.equals(Stomp.V1_2)) { 368 throw new ProtocolException("NACK received without an ack header to acknowledge!"); 369 } 370 371 TransactionId activemqTx = null; 372 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 373 if (stompTx != null) { 374 activemqTx = transactions.get(stompTx); 375 if (activemqTx == null) { 376 throw new ProtocolException("Invalid transaction id: " + stompTx); 377 } 378 } 379 380 boolean nacked = false; 381 382 if (ackId != null) { 383 AckEntry pendingAck = this.pedingAcks.remove(ackId); 384 if (pendingAck != null) { 385 messageId = pendingAck.getMessageId(); 386 MessageAck ack = pendingAck.onMessageNack(activemqTx); 387 if (ack != null) { 388 sendToActiveMQ(ack, createResponseHandler(command)); 389 nacked = true; 390 } 391 } 392 } else if (subscriptionId != null) { 393 StompSubscription sub = this.subscriptions.get(subscriptionId); 394 if (sub != null) { 395 MessageAck ack = sub.onStompMessageNack(messageId, activemqTx); 396 if (ack != null) { 397 sendToActiveMQ(ack, createResponseHandler(command)); 398 nacked = true; 399 } 400 } 401 } 402 403 if (!nacked) { 404 throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]"); 405 } 406 } 407 408 protected void onStompAck(StompFrame command) throws ProtocolException { 409 checkConnected(); 410 411 Map<String, String> headers = command.getHeaders(); 412 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID); 413 if (messageId == null && !(this.version.equals(Stomp.V1_2))) { 414 throw new ProtocolException("ACK received without a message-id to acknowledge!"); 415 } 416 417 String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION); 418 if (subscriptionId == null && this.version.equals(Stomp.V1_1)) { 419 throw new ProtocolException("ACK received without a subscription id for acknowledge!"); 420 } 421 422 String ackId = headers.get(Stomp.Headers.Ack.ACK_ID); 423 if (ackId == null && this.version.equals(Stomp.V1_2)) { 424 throw new ProtocolException("ACK received without a ack id for acknowledge!"); 425 } 426 427 TransactionId activemqTx = null; 428 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 429 if (stompTx != null) { 430 activemqTx = transactions.get(stompTx); 431 if (activemqTx == null) { 432 throw new ProtocolException("Invalid transaction id: " + stompTx); 433 } 434 } 435 436 boolean acked = false; 437 438 if (ackId != null) { 439 AckEntry pendingAck = this.pedingAcks.remove(ackId); 440 if (pendingAck != null) { 441 messageId = pendingAck.getMessageId(); 442 MessageAck ack = pendingAck.onMessageAck(activemqTx); 443 if (ack != null) { 444 sendToActiveMQ(ack, createResponseHandler(command)); 445 acked = true; 446 } 447 } 448 449 } else if (subscriptionId != null) { 450 StompSubscription sub = this.subscriptions.get(subscriptionId); 451 if (sub != null) { 452 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); 453 if (ack != null) { 454 sendToActiveMQ(ack, createResponseHandler(command)); 455 acked = true; 456 } 457 } 458 } else { 459 // STOMP v1.0: acking with just a message id is very bogus since the same message id 460 // could have been sent to 2 different subscriptions on the same Stomp connection. 461 // For example, when 2 subs are created on the same topic. 462 for (StompSubscription sub : subscriptionsByConsumerId.values()) { 463 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); 464 if (ack != null) { 465 sendToActiveMQ(ack, createResponseHandler(command)); 466 acked = true; 467 break; 468 } 469 } 470 } 471 472 if (!acked) { 473 throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]"); 474 } 475 } 476 477 protected void onStompBegin(StompFrame command) throws ProtocolException { 478 checkConnected(); 479 480 Map<String, String> headers = command.getHeaders(); 481 482 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 483 484 if (!headers.containsKey(Stomp.Headers.TRANSACTION)) { 485 throw new ProtocolException("Must specify the transaction you are beginning"); 486 } 487 488 if (transactions.get(stompTx) != null) { 489 throw new ProtocolException("The transaction was already started: " + stompTx); 490 } 491 492 LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId()); 493 transactions.put(stompTx, activemqTx); 494 495 TransactionInfo tx = new TransactionInfo(); 496 tx.setConnectionId(connectionId); 497 tx.setTransactionId(activemqTx); 498 tx.setType(TransactionInfo.BEGIN); 499 500 sendToActiveMQ(tx, createResponseHandler(command)); 501 } 502 503 protected void onStompCommit(StompFrame command) throws ProtocolException { 504 checkConnected(); 505 506 Map<String, String> headers = command.getHeaders(); 507 508 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 509 if (stompTx == null) { 510 throw new ProtocolException("Must specify the transaction you are committing"); 511 } 512 513 TransactionId activemqTx = transactions.remove(stompTx); 514 if (activemqTx == null) { 515 throw new ProtocolException("Invalid transaction id: " + stompTx); 516 } 517 518 for (StompSubscription sub : subscriptionsByConsumerId.values()) { 519 sub.onStompCommit(activemqTx); 520 } 521 522 pedingAcks.clear(); 523 524 TransactionInfo tx = new TransactionInfo(); 525 tx.setConnectionId(connectionId); 526 tx.setTransactionId(activemqTx); 527 tx.setType(TransactionInfo.COMMIT_ONE_PHASE); 528 529 sendToActiveMQ(tx, createResponseHandler(command)); 530 } 531 532 protected void onStompAbort(StompFrame command) throws ProtocolException { 533 checkConnected(); 534 Map<String, String> headers = command.getHeaders(); 535 536 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 537 if (stompTx == null) { 538 throw new ProtocolException("Must specify the transaction you are committing"); 539 } 540 541 TransactionId activemqTx = transactions.remove(stompTx); 542 if (activemqTx == null) { 543 throw new ProtocolException("Invalid transaction id: " + stompTx); 544 } 545 for (StompSubscription sub : subscriptionsByConsumerId.values()) { 546 try { 547 sub.onStompAbort(activemqTx); 548 } catch (Exception e) { 549 throw new ProtocolException("Transaction abort failed", false, e); 550 } 551 } 552 553 pedingAcks.clear(); 554 555 TransactionInfo tx = new TransactionInfo(); 556 tx.setConnectionId(connectionId); 557 tx.setTransactionId(activemqTx); 558 tx.setType(TransactionInfo.ROLLBACK); 559 560 sendToActiveMQ(tx, createResponseHandler(command)); 561 } 562 563 protected void onStompSubscribe(StompFrame command) throws ProtocolException { 564 checkConnected(); 565 FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)); 566 Map<String, String> headers = command.getHeaders(); 567 568 String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID); 569 String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION); 570 571 if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) { 572 throw new ProtocolException("SUBSCRIBE received without a subscription id!"); 573 } 574 575 final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true); 576 577 if (actualDest == null) { 578 throw new ProtocolException("Invalid 'null' Destination."); 579 } 580 581 final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); 582 ConsumerInfo consumerInfo = new ConsumerInfo(id); 583 consumerInfo.setPrefetchSize(actualDest.isQueue() ? 584 ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH : 585 headers.containsKey("activemq.subscriptionName") ? 586 ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH : ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH); 587 consumerInfo.setDispatchAsync(true); 588 589 String browser = headers.get(Stomp.Headers.Subscribe.BROWSER); 590 if (browser != null && browser.equals(Stomp.TRUE)) { 591 592 if (this.version.equals(Stomp.V1_0)) { 593 throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1+ clients!"); 594 } 595 596 consumerInfo.setBrowser(true); 597 consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH); 598 } 599 600 String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR); 601 if (selector != null) { 602 consumerInfo.setSelector("convert_string_expressions:" + selector); 603 } 604 605 IntrospectionSupport.setProperties(consumerInfo, headers, "activemq."); 606 607 if (actualDest.isQueue() && consumerInfo.getSubscriptionName() != null) { 608 throw new ProtocolException("Invalid Subscription: cannot durably subscribe to a Queue destination!"); 609 } 610 611 consumerInfo.setDestination(actualDest); 612 613 StompSubscription stompSubscription; 614 if (!consumerInfo.isBrowser()) { 615 stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION)); 616 } else { 617 stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION)); 618 } 619 stompSubscription.setDestination(actualDest); 620 621 String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE); 622 if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) { 623 stompSubscription.setAckMode(StompSubscription.CLIENT_ACK); 624 } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) { 625 stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK); 626 } else { 627 stompSubscription.setAckMode(StompSubscription.AUTO_ACK); 628 } 629 630 subscriptionsByConsumerId.put(id, stompSubscription); 631 // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set. 632 if (subscriptionId != null) { 633 subscriptions.put(subscriptionId, stompSubscription); 634 } 635 636 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 637 if (receiptId != null && consumerInfo.getPrefetchSize() > 0) { 638 639 final StompFrame cmd = command; 640 final int prefetch = consumerInfo.getPrefetchSize(); 641 642 // Since dispatch could beat the receipt we set prefetch to zero to start and then 643 // once we've sent our Receipt we are safe to turn on dispatch if the response isn't 644 // an error message. 645 consumerInfo.setPrefetchSize(0); 646 647 final ResponseHandler handler = new ResponseHandler() { 648 @Override 649 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 650 if (response.isException()) { 651 // Generally a command can fail.. but that does not invalidate the connection. 652 // We report back the failure but we don't close the connection. 653 Throwable exception = ((ExceptionResponse)response).getException(); 654 handleException(exception, cmd); 655 } else { 656 StompFrame sc = new StompFrame(); 657 sc.setAction(Stomp.Responses.RECEIPT); 658 sc.setHeaders(new HashMap<String, String>(1)); 659 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 660 stompTransport.sendToStomp(sc); 661 662 ConsumerControl control = new ConsumerControl(); 663 control.setPrefetch(prefetch); 664 control.setDestination(actualDest); 665 control.setConsumerId(id); 666 667 sendToActiveMQ(control, null); 668 } 669 } 670 }; 671 672 sendToActiveMQ(consumerInfo, handler); 673 } else { 674 sendToActiveMQ(consumerInfo, createResponseHandler(command)); 675 } 676 } 677 678 protected void onStompUnsubscribe(StompFrame command) throws ProtocolException { 679 checkConnected(); 680 Map<String, String> headers = command.getHeaders(); 681 682 ActiveMQDestination destination = null; 683 Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION); 684 if (o != null) { 685 destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true); 686 } 687 688 String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID); 689 if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) { 690 throw new ProtocolException("UNSUBSCRIBE received without a subscription id!"); 691 } 692 693 if (subscriptionId == null && destination == null) { 694 throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from"); 695 } 696 697 // check if it is a durable subscription 698 String durable = command.getHeaders().get("activemq.subscriptionName"); 699 String clientId = durable; 700 if (!this.version.equals(Stomp.V1_0)) { 701 clientId = connectionInfo.getClientId(); 702 } 703 704 if (durable != null) { 705 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 706 info.setClientId(clientId); 707 info.setSubscriptionName(durable); 708 info.setConnectionId(connectionId); 709 sendToActiveMQ(info, createResponseHandler(command)); 710 return; 711 } 712 713 if (subscriptionId != null) { 714 StompSubscription sub = this.subscriptions.remove(subscriptionId); 715 if (sub != null) { 716 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); 717 return; 718 } 719 } else { 720 // Unsubscribing using a destination is a bit weird if multiple subscriptions 721 // are created with the same destination. 722 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { 723 StompSubscription sub = iter.next(); 724 if (destination != null && destination.equals(sub.getDestination())) { 725 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); 726 iter.remove(); 727 return; 728 } 729 } 730 } 731 732 throw new ProtocolException("No subscription matched."); 733 } 734 735 ConnectionInfo connectionInfo = new ConnectionInfo(); 736 737 protected void onStompConnect(final StompFrame command) throws ProtocolException { 738 739 if (connected.get()) { 740 throw new ProtocolException("Already connected."); 741 } 742 743 final Map<String, String> headers = command.getHeaders(); 744 745 // allow anyone to login for now 746 String login = headers.get(Stomp.Headers.Connect.LOGIN); 747 String passcode = headers.get(Stomp.Headers.Connect.PASSCODE); 748 String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID); 749 String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT); 750 751 if (heartBeat == null) { 752 heartBeat = defaultHeartBeat; 753 } 754 755 this.version = StompCodec.detectVersion(headers); 756 757 configureInactivityMonitor(heartBeat.trim()); 758 759 IntrospectionSupport.setProperties(connectionInfo, headers, "activemq."); 760 connectionInfo.setConnectionId(connectionId); 761 if (clientId != null) { 762 connectionInfo.setClientId(clientId); 763 } else { 764 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); 765 } 766 767 connectionInfo.setResponseRequired(true); 768 connectionInfo.setUserName(login); 769 connectionInfo.setPassword(passcode); 770 connectionInfo.setTransportContext(command.getTransportContext()); 771 772 sendToActiveMQ(connectionInfo, new ResponseHandler() { 773 @Override 774 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 775 776 if (response.isException()) { 777 // If the connection attempt fails we close the socket. 778 Throwable exception = ((ExceptionResponse)response).getException(); 779 handleException(exception, command); 780 getStompTransport().onException(IOExceptionSupport.create(exception)); 781 return; 782 } 783 784 final SessionInfo sessionInfo = new SessionInfo(sessionId); 785 sendToActiveMQ(sessionInfo, null); 786 787 final ProducerInfo producerInfo = new ProducerInfo(producerId); 788 sendToActiveMQ(producerInfo, new ResponseHandler() { 789 @Override 790 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 791 792 if (response.isException()) { 793 // If the connection attempt fails we close the socket. 794 Throwable exception = ((ExceptionResponse)response).getException(); 795 handleException(exception, command); 796 getStompTransport().onException(IOExceptionSupport.create(exception)); 797 } 798 799 connected.set(true); 800 HashMap<String, String> responseHeaders = new HashMap<String, String>(); 801 802 responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId()); 803 String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID); 804 if (requestId == null) { 805 // TODO legacy 806 requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED); 807 } 808 if (requestId != null) { 809 // TODO legacy 810 responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId); 811 responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId); 812 } 813 814 responseHeaders.put(Stomp.Headers.Connected.VERSION, version); 815 responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT, 816 String.format("%d,%d", hbWriteInterval, hbReadInterval)); 817 responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION); 818 819 StompFrame sc = new StompFrame(); 820 sc.setAction(Stomp.Responses.CONNECTED); 821 sc.setHeaders(responseHeaders); 822 sendToStomp(sc); 823 824 StompWireFormat format = stompTransport.getWireFormat(); 825 if (format != null) { 826 format.setStompVersion(version); 827 } 828 } 829 }); 830 } 831 }); 832 } 833 834 protected void onStompDisconnect(StompFrame command) throws ProtocolException { 835 if (connected.get()) { 836 sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command)); 837 sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); 838 connected.set(false); 839 } 840 } 841 842 protected void checkConnected() throws ProtocolException { 843 if (!connected.get()) { 844 throw new ProtocolException("Not connected."); 845 } 846 } 847 848 /** 849 * Dispatch a ActiveMQ command 850 * 851 * @param command 852 * @throws IOException 853 */ 854 public void onActiveMQCommand(Command command) throws IOException, JMSException { 855 if (command.isResponse()) { 856 Response response = (Response)command; 857 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); 858 if (rh != null) { 859 rh.onResponse(this, response); 860 } else { 861 // Pass down any unexpected errors. Should this close the connection? 862 if (response.isException()) { 863 Throwable exception = ((ExceptionResponse)response).getException(); 864 handleException(exception, null); 865 } 866 } 867 } else if (command.isMessageDispatch()) { 868 MessageDispatch md = (MessageDispatch)command; 869 StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); 870 if (sub != null) { 871 String ackId = null; 872 if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO && md.getMessage() != null) { 873 AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub); 874 ackId = this.ACK_ID_GENERATOR.generateId(); 875 this.pedingAcks.put(ackId, pendingAck); 876 } 877 try { 878 sub.onMessageDispatch(md, ackId); 879 } catch (Exception ex) { 880 if (ackId != null) { 881 this.pedingAcks.remove(ackId); 882 } 883 } 884 } 885 } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) { 886 stompTransport.sendToStomp(ping); 887 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { 888 // Pass down any unexpected async errors. Should this close the connection? 889 Throwable exception = ((ConnectionError)command).getException(); 890 handleException(exception, null); 891 } 892 } 893 894 public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException { 895 ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command); 896 return msg; 897 } 898 899 public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException { 900 if (ignoreTransformation == true) { 901 return frameTranslator.convertMessage(this, message); 902 } else { 903 FrameTranslator translator = findTranslator( 904 message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination(), message.isAdvisory()); 905 return translator.convertMessage(this, message); 906 } 907 } 908 909 public StompTransport getStompTransport() { 910 return stompTransport; 911 } 912 913 public ActiveMQDestination createTempDestination(String name, boolean topic) { 914 ActiveMQDestination rc = tempDestinations.get(name); 915 if( rc == null ) { 916 if (topic) { 917 rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId()); 918 } else { 919 rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId()); 920 } 921 sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); 922 tempDestinations.put(name, rc); 923 tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name); 924 } 925 return rc; 926 } 927 928 public String getCreatedTempDestinationName(ActiveMQDestination destination) { 929 return tempDestinationAmqToStompMap.get(destination.getQualifiedName()); 930 } 931 932 public String getDefaultHeartBeat() { 933 return defaultHeartBeat; 934 } 935 936 public void setDefaultHeartBeat(String defaultHeartBeat) { 937 this.defaultHeartBeat = defaultHeartBeat; 938 } 939 940 /** 941 * @return the hbGracePeriodMultiplier 942 */ 943 public float getHbGracePeriodMultiplier() { 944 return hbGracePeriodMultiplier; 945 } 946 947 /** 948 * @param hbGracePeriodMultiplier the hbGracePeriodMultiplier to set 949 */ 950 public void setHbGracePeriodMultiplier(float hbGracePeriodMultiplier) { 951 this.hbGracePeriodMultiplier = hbGracePeriodMultiplier; 952 } 953 954 protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException { 955 956 String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA); 957 958 if (keepAliveOpts == null || keepAliveOpts.length != 2) { 959 throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true); 960 } else { 961 962 try { 963 hbReadInterval = (Long.parseLong(keepAliveOpts[0])); 964 hbWriteInterval = Long.parseLong(keepAliveOpts[1]); 965 } catch(NumberFormatException e) { 966 throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true); 967 } 968 969 try { 970 StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor(); 971 monitor.setReadCheckTime((long) (hbReadInterval * hbGracePeriodMultiplier)); 972 monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval)); 973 monitor.setWriteCheckTime(hbWriteInterval); 974 monitor.startMonitoring(); 975 } catch(Exception ex) { 976 hbReadInterval = 0; 977 hbWriteInterval = 0; 978 } 979 980 if (LOG.isDebugEnabled()) { 981 LOG.debug("Stomp Connect heartbeat conf RW[{},{}]", hbReadInterval, hbWriteInterval); 982 } 983 } 984 } 985 986 protected void sendReceipt(StompFrame command) { 987 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 988 if (receiptId != null) { 989 StompFrame sc = new StompFrame(); 990 sc.setAction(Stomp.Responses.RECEIPT); 991 sc.setHeaders(new HashMap<String, String>(1)); 992 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 993 try { 994 sendToStomp(sc); 995 } catch (IOException e) { 996 LOG.warn("Could not send a receipt for {}", command, e); 997 } 998 } 999 } 1000 1001 /** 1002 * Retrieve the STOMP action value from a frame if the value is valid, otherwise 1003 * return an unknown string to allow for safe log output. 1004 * 1005 * @param command 1006 * The STOMP command to fetch an action from. 1007 * 1008 * @return the command action or a safe string to use in logging. 1009 */ 1010 protected Object safeGetAction(StompFrame command) { 1011 String result = "<Unknown>"; 1012 if (command != null && command.getAction() != null) { 1013 String action = command.getAction().trim(); 1014 1015 if (action != null) { 1016 switch (action) { 1017 case Stomp.Commands.SEND: 1018 case Stomp.Commands.ACK: 1019 case Stomp.Commands.NACK: 1020 case Stomp.Commands.BEGIN: 1021 case Stomp.Commands.COMMIT: 1022 case Stomp.Commands.ABORT: 1023 case Stomp.Commands.SUBSCRIBE: 1024 case Stomp.Commands.UNSUBSCRIBE: 1025 case Stomp.Commands.CONNECT: 1026 case Stomp.Commands.STOMP: 1027 case Stomp.Commands.DISCONNECT: 1028 result = action; 1029 break; 1030 default: 1031 break; 1032 } 1033 } 1034 } 1035 1036 return result; 1037 } 1038}