001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.stomp; 018 019import java.io.BufferedReader; 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InputStreamReader; 023import java.io.OutputStreamWriter; 024import java.io.PrintWriter; 025import java.util.HashMap; 026import java.util.Iterator; 027import java.util.Map; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import java.util.concurrent.atomic.AtomicBoolean; 031 032import javax.jms.JMSException; 033 034import org.apache.activemq.ActiveMQPrefetchPolicy; 035import org.apache.activemq.advisory.AdvisorySupport; 036import org.apache.activemq.broker.BrokerContext; 037import org.apache.activemq.broker.BrokerContextAware; 038import org.apache.activemq.command.ActiveMQDestination; 039import org.apache.activemq.command.ActiveMQMessage; 040import org.apache.activemq.command.ActiveMQTempQueue; 041import org.apache.activemq.command.ActiveMQTempTopic; 042import org.apache.activemq.command.Command; 043import org.apache.activemq.command.CommandTypes; 044import org.apache.activemq.command.ConnectionError; 045import org.apache.activemq.command.ConnectionId; 046import org.apache.activemq.command.ConnectionInfo; 047import org.apache.activemq.command.ConsumerControl; 048import org.apache.activemq.command.ConsumerId; 049import org.apache.activemq.command.ConsumerInfo; 050import org.apache.activemq.command.DestinationInfo; 051import org.apache.activemq.command.ExceptionResponse; 052import org.apache.activemq.command.LocalTransactionId; 053import org.apache.activemq.command.MessageAck; 054import org.apache.activemq.command.MessageDispatch; 055import org.apache.activemq.command.MessageId; 056import org.apache.activemq.command.ProducerId; 057import org.apache.activemq.command.ProducerInfo; 058import org.apache.activemq.command.RemoveSubscriptionInfo; 059import org.apache.activemq.command.Response; 060import org.apache.activemq.command.SessionId; 061import org.apache.activemq.command.SessionInfo; 062import org.apache.activemq.command.ShutdownInfo; 063import org.apache.activemq.command.TransactionId; 064import org.apache.activemq.command.TransactionInfo; 065import org.apache.activemq.util.ByteArrayOutputStream; 066import org.apache.activemq.util.FactoryFinder; 067import org.apache.activemq.util.IOExceptionSupport; 068import org.apache.activemq.util.IdGenerator; 069import org.apache.activemq.util.IntrospectionSupport; 070import org.apache.activemq.util.LongSequenceGenerator; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074/** 075 * @author <a href="http://hiramchirino.com">chirino</a> 076 */ 077public class ProtocolConverter { 078 079 private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class); 080 081 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 082 083 private static final String BROKER_VERSION; 084 private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE); 085 086 static { 087 String version = "5.6.0"; 088 try(InputStream in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) { 089 if (in != null) { 090 try(InputStreamReader isr = new InputStreamReader(in); 091 BufferedReader reader = new BufferedReader(isr)) { 092 version = reader.readLine(); 093 } 094 } 095 }catch(Exception e) { 096 } 097 098 BROKER_VERSION = version; 099 } 100 101 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 102 private final SessionId sessionId = new SessionId(connectionId, -1); 103 private final ProducerId producerId = new ProducerId(sessionId, 1); 104 105 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 106 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 107 private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); 108 private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator(); 109 110 private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); 111 private final ConcurrentMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>(); 112 private final ConcurrentMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>(); 113 private final ConcurrentMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>(); 114 private final ConcurrentMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>(); 115 private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>(); 116 private final StompTransport stompTransport; 117 118 private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<String, AckEntry>(); 119 private final IdGenerator ACK_ID_GENERATOR = new IdGenerator(); 120 121 private final Object commnadIdMutex = new Object(); 122 private int lastCommandId; 123 private final AtomicBoolean connected = new AtomicBoolean(false); 124 private final FrameTranslator frameTranslator = new LegacyFrameTranslator(); 125 private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/"); 126 private final BrokerContext brokerContext; 127 private String version = "1.0"; 128 private long hbReadInterval; 129 private long hbWriteInterval; 130 private float hbGracePeriodMultiplier = 1.0f; 131 private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT; 132 133 private static class AckEntry { 134 135 private final String messageId; 136 private final StompSubscription subscription; 137 138 public AckEntry(String messageId, StompSubscription subscription) { 139 this.messageId = messageId; 140 this.subscription = subscription; 141 } 142 143 public MessageAck onMessageAck(TransactionId transactionId) { 144 return subscription.onStompMessageAck(messageId, transactionId); 145 } 146 147 public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException { 148 return subscription.onStompMessageNack(messageId, transactionId); 149 } 150 151 public String getMessageId() { 152 return this.messageId; 153 } 154 155 @SuppressWarnings("unused") 156 public StompSubscription getSubscription() { 157 return this.subscription; 158 } 159 } 160 161 public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) { 162 this.stompTransport = stompTransport; 163 this.brokerContext = brokerContext; 164 } 165 166 protected int generateCommandId() { 167 synchronized (commnadIdMutex) { 168 return lastCommandId++; 169 } 170 } 171 172 protected ResponseHandler createResponseHandler(final StompFrame command) { 173 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 174 if (receiptId != null) { 175 return new ResponseHandler() { 176 @Override 177 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 178 if (response.isException()) { 179 // Generally a command can fail.. but that does not invalidate the connection. 180 // We report back the failure but we don't close the connection. 181 Throwable exception = ((ExceptionResponse)response).getException(); 182 handleException(exception, command); 183 } else { 184 StompFrame sc = new StompFrame(); 185 sc.setAction(Stomp.Responses.RECEIPT); 186 sc.setHeaders(new HashMap<String, String>(1)); 187 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 188 stompTransport.sendToStomp(sc); 189 } 190 } 191 }; 192 } 193 return null; 194 } 195 196 protected void sendToActiveMQ(Command command, ResponseHandler handler) { 197 command.setCommandId(generateCommandId()); 198 if (handler != null) { 199 command.setResponseRequired(true); 200 resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); 201 } 202 stompTransport.sendToActiveMQ(command); 203 } 204 205 protected void sendToStomp(StompFrame command) throws IOException { 206 stompTransport.sendToStomp(command); 207 } 208 209 protected FrameTranslator findTranslator(String header) { 210 return findTranslator(header, null, false); 211 } 212 213 protected FrameTranslator findTranslator(String header, ActiveMQDestination destination, boolean advisory) { 214 FrameTranslator translator = frameTranslator; 215 try { 216 if (header != null) { 217 translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header); 218 } else { 219 if (destination != null && (advisory || AdvisorySupport.isAdvisoryTopic(destination))) { 220 translator = new JmsFrameTranslator(); 221 } 222 } 223 } catch (Exception ignore) { 224 // if anything goes wrong use the default translator 225 } 226 227 if (translator instanceof BrokerContextAware) { 228 ((BrokerContextAware)translator).setBrokerContext(brokerContext); 229 } 230 231 return translator; 232 } 233 234 /** 235 * Convert a STOMP command 236 * 237 * @param command 238 */ 239 public void onStompCommand(StompFrame command) throws IOException, JMSException { 240 try { 241 242 if (command.getClass() == StompFrameError.class) { 243 throw ((StompFrameError)command).getException(); 244 } 245 246 String action = command.getAction(); 247 if (action.startsWith(Stomp.Commands.SEND)) { 248 onStompSend(command); 249 } else if (action.startsWith(Stomp.Commands.ACK)) { 250 onStompAck(command); 251 } else if (action.startsWith(Stomp.Commands.NACK)) { 252 onStompNack(command); 253 } else if (action.startsWith(Stomp.Commands.BEGIN)) { 254 onStompBegin(command); 255 } else if (action.startsWith(Stomp.Commands.COMMIT)) { 256 onStompCommit(command); 257 } else if (action.startsWith(Stomp.Commands.ABORT)) { 258 onStompAbort(command); 259 } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) { 260 onStompSubscribe(command); 261 } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) { 262 onStompUnsubscribe(command); 263 } else if (action.startsWith(Stomp.Commands.CONNECT) || 264 action.startsWith(Stomp.Commands.STOMP)) { 265 onStompConnect(command); 266 } else if (action.startsWith(Stomp.Commands.DISCONNECT)) { 267 onStompDisconnect(command); 268 } else { 269 throw new ProtocolException("Unknown STOMP action: " + action, true); 270 } 271 272 } catch (ProtocolException e) { 273 handleException(e, command); 274 // Some protocol errors can cause the connection to get closed. 275 if (e.isFatal()) { 276 getStompTransport().onException(e); 277 } 278 } 279 } 280 281 protected void handleException(Throwable exception, StompFrame command) throws IOException { 282 if (command == null) { 283 LOG.warn("Exception occurred while processing a command: {}", exception.toString()); 284 } else { 285 LOG.warn("Exception occurred processing: {} -> {}", safeGetAction(command), exception.toString()); 286 } 287 288 if (LOG.isDebugEnabled()) { 289 LOG.debug("Exception detail", exception); 290 } 291 292 if (command != null && LOG.isTraceEnabled()) { 293 LOG.trace("Command that caused the error: {}", command); 294 } 295 296 // Let the stomp client know about any protocol errors. 297 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 298 PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8")); 299 if (exception instanceof SecurityException || exception.getCause() instanceof SecurityException) { 300 stream.write(exception.getLocalizedMessage()); 301 } else { 302 exception.printStackTrace(stream); 303 } 304 stream.close(); 305 306 HashMap<String, String> headers = new HashMap<String, String>(); 307 headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage()); 308 headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain"); 309 310 if (command != null) { 311 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 312 if (receiptId != null) { 313 headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 314 } 315 } 316 317 StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray()); 318 sendToStomp(errorMessage); 319 } 320 321 protected void onStompSend(StompFrame command) throws IOException, JMSException { 322 checkConnected(); 323 324 Map<String, String> headers = command.getHeaders(); 325 String destination = headers.get(Stomp.Headers.Send.DESTINATION); 326 if (destination == null) { 327 throw new ProtocolException("SEND received without a Destination specified!"); 328 } 329 330 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 331 headers.remove("transaction"); 332 333 ActiveMQMessage message = convertMessage(command); 334 335 message.setProducerId(producerId); 336 MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); 337 message.setMessageId(id); 338 339 if (stompTx != null) { 340 TransactionId activemqTx = transactions.get(stompTx); 341 if (activemqTx == null) { 342 throw new ProtocolException("Invalid transaction id: " + stompTx); 343 } 344 message.setTransactionId(activemqTx); 345 } 346 347 message.onSend(); 348 message.beforeMarshall(null); 349 sendToActiveMQ(message, createResponseHandler(command)); 350 } 351 352 protected void onStompNack(StompFrame command) throws ProtocolException { 353 354 checkConnected(); 355 356 if (this.version.equals(Stomp.V1_0)) { 357 throw new ProtocolException("NACK received but connection is in v1.0 mode."); 358 } 359 360 Map<String, String> headers = command.getHeaders(); 361 362 String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION); 363 if (subscriptionId == null && !this.version.equals(Stomp.V1_2)) { 364 throw new ProtocolException("NACK received without a subscription id for acknowledge!"); 365 } 366 367 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID); 368 if (messageId == null && !this.version.equals(Stomp.V1_2)) { 369 throw new ProtocolException("NACK received without a message-id to acknowledge!"); 370 } 371 372 String ackId = headers.get(Stomp.Headers.Ack.ACK_ID); 373 if (ackId == null && this.version.equals(Stomp.V1_2)) { 374 throw new ProtocolException("NACK received without an ack header to acknowledge!"); 375 } 376 377 TransactionId activemqTx = null; 378 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 379 if (stompTx != null) { 380 activemqTx = transactions.get(stompTx); 381 if (activemqTx == null) { 382 throw new ProtocolException("Invalid transaction id: " + stompTx); 383 } 384 } 385 386 boolean nacked = false; 387 388 if (ackId != null) { 389 AckEntry pendingAck = this.pedingAcks.remove(ackId); 390 if (pendingAck != null) { 391 messageId = pendingAck.getMessageId(); 392 MessageAck ack = pendingAck.onMessageNack(activemqTx); 393 if (ack != null) { 394 sendToActiveMQ(ack, createResponseHandler(command)); 395 nacked = true; 396 } 397 } 398 } else if (subscriptionId != null) { 399 StompSubscription sub = this.subscriptions.get(subscriptionId); 400 if (sub != null) { 401 MessageAck ack = sub.onStompMessageNack(messageId, activemqTx); 402 if (ack != null) { 403 sendToActiveMQ(ack, createResponseHandler(command)); 404 nacked = true; 405 } 406 } 407 } 408 409 if (!nacked) { 410 throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]"); 411 } 412 } 413 414 protected void onStompAck(StompFrame command) throws ProtocolException { 415 checkConnected(); 416 417 Map<String, String> headers = command.getHeaders(); 418 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID); 419 if (messageId == null && !(this.version.equals(Stomp.V1_2))) { 420 throw new ProtocolException("ACK received without a message-id to acknowledge!"); 421 } 422 423 String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION); 424 if (subscriptionId == null && this.version.equals(Stomp.V1_1)) { 425 throw new ProtocolException("ACK received without a subscription id for acknowledge!"); 426 } 427 428 String ackId = headers.get(Stomp.Headers.Ack.ACK_ID); 429 if (ackId == null && this.version.equals(Stomp.V1_2)) { 430 throw new ProtocolException("ACK received without a ack id for acknowledge!"); 431 } 432 433 TransactionId activemqTx = null; 434 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 435 if (stompTx != null) { 436 activemqTx = transactions.get(stompTx); 437 if (activemqTx == null) { 438 throw new ProtocolException("Invalid transaction id: " + stompTx); 439 } 440 } 441 442 boolean acked = false; 443 444 if (ackId != null) { 445 AckEntry pendingAck = this.pedingAcks.remove(ackId); 446 if (pendingAck != null) { 447 messageId = pendingAck.getMessageId(); 448 MessageAck ack = pendingAck.onMessageAck(activemqTx); 449 if (ack != null) { 450 sendToActiveMQ(ack, createResponseHandler(command)); 451 acked = true; 452 } 453 } 454 455 } else if (subscriptionId != null) { 456 StompSubscription sub = this.subscriptions.get(subscriptionId); 457 if (sub != null) { 458 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); 459 if (ack != null) { 460 sendToActiveMQ(ack, createResponseHandler(command)); 461 acked = true; 462 } 463 } 464 } else { 465 // STOMP v1.0: acking with just a message id is very bogus since the same message id 466 // could have been sent to 2 different subscriptions on the same Stomp connection. 467 // For example, when 2 subs are created on the same topic. 468 for (StompSubscription sub : subscriptionsByConsumerId.values()) { 469 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); 470 if (ack != null) { 471 sendToActiveMQ(ack, createResponseHandler(command)); 472 acked = true; 473 break; 474 } 475 } 476 } 477 478 if (!acked) { 479 throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]"); 480 } 481 } 482 483 protected void onStompBegin(StompFrame command) throws ProtocolException { 484 checkConnected(); 485 486 Map<String, String> headers = command.getHeaders(); 487 488 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 489 490 if (!headers.containsKey(Stomp.Headers.TRANSACTION)) { 491 throw new ProtocolException("Must specify the transaction you are beginning"); 492 } 493 494 if (transactions.get(stompTx) != null) { 495 throw new ProtocolException("The transaction was already started: " + stompTx); 496 } 497 498 LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId()); 499 transactions.put(stompTx, activemqTx); 500 501 TransactionInfo tx = new TransactionInfo(); 502 tx.setConnectionId(connectionId); 503 tx.setTransactionId(activemqTx); 504 tx.setType(TransactionInfo.BEGIN); 505 506 sendToActiveMQ(tx, createResponseHandler(command)); 507 } 508 509 protected void onStompCommit(StompFrame command) throws ProtocolException { 510 checkConnected(); 511 512 Map<String, String> headers = command.getHeaders(); 513 514 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 515 if (stompTx == null) { 516 throw new ProtocolException("Must specify the transaction you are committing"); 517 } 518 519 TransactionId activemqTx = transactions.remove(stompTx); 520 if (activemqTx == null) { 521 throw new ProtocolException("Invalid transaction id: " + stompTx); 522 } 523 524 for (StompSubscription sub : subscriptionsByConsumerId.values()) { 525 sub.onStompCommit(activemqTx); 526 } 527 528 pedingAcks.clear(); 529 530 TransactionInfo tx = new TransactionInfo(); 531 tx.setConnectionId(connectionId); 532 tx.setTransactionId(activemqTx); 533 tx.setType(TransactionInfo.COMMIT_ONE_PHASE); 534 535 sendToActiveMQ(tx, createResponseHandler(command)); 536 } 537 538 protected void onStompAbort(StompFrame command) throws ProtocolException { 539 checkConnected(); 540 Map<String, String> headers = command.getHeaders(); 541 542 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 543 if (stompTx == null) { 544 throw new ProtocolException("Must specify the transaction you are committing"); 545 } 546 547 TransactionId activemqTx = transactions.remove(stompTx); 548 if (activemqTx == null) { 549 throw new ProtocolException("Invalid transaction id: " + stompTx); 550 } 551 for (StompSubscription sub : subscriptionsByConsumerId.values()) { 552 try { 553 sub.onStompAbort(activemqTx); 554 } catch (Exception e) { 555 throw new ProtocolException("Transaction abort failed", false, e); 556 } 557 } 558 559 pedingAcks.clear(); 560 561 TransactionInfo tx = new TransactionInfo(); 562 tx.setConnectionId(connectionId); 563 tx.setTransactionId(activemqTx); 564 tx.setType(TransactionInfo.ROLLBACK); 565 566 sendToActiveMQ(tx, createResponseHandler(command)); 567 } 568 569 protected void onStompSubscribe(StompFrame command) throws ProtocolException { 570 checkConnected(); 571 FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)); 572 Map<String, String> headers = command.getHeaders(); 573 574 String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID); 575 String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION); 576 577 if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) { 578 throw new ProtocolException("SUBSCRIBE received without a subscription id!"); 579 } 580 581 if (destination == null || "".equals(destination)) { 582 throw new ProtocolException("Invalid empty or 'null' Destination header"); 583 } 584 585 final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true); 586 587 if (actualDest == null) { 588 throw new ProtocolException("Invalid 'null' Destination."); 589 } 590 591 final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); 592 ConsumerInfo consumerInfo = new ConsumerInfo(id); 593 consumerInfo.setPrefetchSize(actualDest.isQueue() ? 594 ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH : 595 headers.containsKey("activemq.subscriptionName") ? 596 ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH : ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH); 597 consumerInfo.setDispatchAsync(true); 598 599 String browser = headers.get(Stomp.Headers.Subscribe.BROWSER); 600 if (browser != null && browser.equals(Stomp.TRUE)) { 601 602 if (this.version.equals(Stomp.V1_0)) { 603 throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1+ clients!"); 604 } 605 606 consumerInfo.setBrowser(true); 607 consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH); 608 } 609 610 String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR); 611 if (selector != null) { 612 consumerInfo.setSelector("convert_string_expressions:" + selector); 613 } 614 615 IntrospectionSupport.setProperties(consumerInfo, headers, "activemq."); 616 617 if (actualDest.isQueue() && consumerInfo.getSubscriptionName() != null) { 618 throw new ProtocolException("Invalid Subscription: cannot durably subscribe to a Queue destination!"); 619 } 620 621 consumerInfo.setDestination(actualDest); 622 623 StompSubscription stompSubscription; 624 if (!consumerInfo.isBrowser()) { 625 stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION)); 626 } else { 627 stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION)); 628 } 629 stompSubscription.setDestination(actualDest); 630 631 String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE); 632 if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) { 633 stompSubscription.setAckMode(StompSubscription.CLIENT_ACK); 634 } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) { 635 stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK); 636 } else { 637 stompSubscription.setAckMode(StompSubscription.AUTO_ACK); 638 } 639 640 subscriptionsByConsumerId.put(id, stompSubscription); 641 // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set. 642 if (subscriptionId != null) { 643 subscriptions.put(subscriptionId, stompSubscription); 644 } 645 646 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 647 if (receiptId != null && consumerInfo.getPrefetchSize() > 0) { 648 649 final StompFrame cmd = command; 650 final int prefetch = consumerInfo.getPrefetchSize(); 651 652 // Since dispatch could beat the receipt we set prefetch to zero to start and then 653 // once we've sent our Receipt we are safe to turn on dispatch if the response isn't 654 // an error message. 655 consumerInfo.setPrefetchSize(0); 656 657 final ResponseHandler handler = new ResponseHandler() { 658 @Override 659 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 660 if (response.isException()) { 661 // Generally a command can fail.. but that does not invalidate the connection. 662 // We report back the failure but we don't close the connection. 663 Throwable exception = ((ExceptionResponse)response).getException(); 664 handleException(exception, cmd); 665 } else { 666 StompFrame sc = new StompFrame(); 667 sc.setAction(Stomp.Responses.RECEIPT); 668 sc.setHeaders(new HashMap<String, String>(1)); 669 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 670 stompTransport.sendToStomp(sc); 671 672 ConsumerControl control = new ConsumerControl(); 673 control.setPrefetch(prefetch); 674 control.setDestination(actualDest); 675 control.setConsumerId(id); 676 677 sendToActiveMQ(control, null); 678 } 679 } 680 }; 681 682 sendToActiveMQ(consumerInfo, handler); 683 } else { 684 sendToActiveMQ(consumerInfo, createResponseHandler(command)); 685 } 686 } 687 688 protected void onStompUnsubscribe(StompFrame command) throws ProtocolException { 689 checkConnected(); 690 Map<String, String> headers = command.getHeaders(); 691 692 ActiveMQDestination destination = null; 693 Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION); 694 if (o != null) { 695 destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true); 696 } 697 698 String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID); 699 if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) { 700 throw new ProtocolException("UNSUBSCRIBE received without a subscription id!"); 701 } 702 703 if (subscriptionId == null && destination == null) { 704 throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from"); 705 } 706 707 // check if it is a durable subscription 708 String durable = command.getHeaders().get("activemq.subscriptionName"); 709 String clientId = durable; 710 if (!this.version.equals(Stomp.V1_0)) { 711 clientId = connectionInfo.getClientId(); 712 } 713 714 if (durable != null) { 715 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 716 info.setClientId(clientId); 717 info.setSubscriptionName(durable); 718 info.setConnectionId(connectionId); 719 sendToActiveMQ(info, createResponseHandler(command)); 720 return; 721 } 722 723 if (subscriptionId != null) { 724 StompSubscription sub = this.subscriptions.remove(subscriptionId); 725 if (sub != null) { 726 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); 727 return; 728 } 729 } else { 730 // Unsubscribing using a destination is a bit weird if multiple subscriptions 731 // are created with the same destination. 732 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { 733 StompSubscription sub = iter.next(); 734 if (destination != null && destination.equals(sub.getDestination())) { 735 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); 736 iter.remove(); 737 return; 738 } 739 } 740 } 741 742 throw new ProtocolException("No subscription matched."); 743 } 744 745 ConnectionInfo connectionInfo = new ConnectionInfo(); 746 747 protected void onStompConnect(final StompFrame command) throws ProtocolException { 748 749 if (connected.get()) { 750 throw new ProtocolException("Already connected."); 751 } 752 753 final Map<String, String> headers = command.getHeaders(); 754 755 // allow anyone to login for now 756 String login = headers.get(Stomp.Headers.Connect.LOGIN); 757 String passcode = headers.get(Stomp.Headers.Connect.PASSCODE); 758 String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID); 759 String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT); 760 761 if (heartBeat == null) { 762 heartBeat = defaultHeartBeat; 763 } 764 765 this.version = StompCodec.detectVersion(headers); 766 767 configureInactivityMonitor(heartBeat.trim()); 768 769 IntrospectionSupport.setProperties(connectionInfo, headers, "activemq."); 770 connectionInfo.setConnectionId(connectionId); 771 if (clientId != null) { 772 connectionInfo.setClientId(clientId); 773 } else { 774 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); 775 } 776 777 connectionInfo.setResponseRequired(true); 778 connectionInfo.setUserName(login); 779 connectionInfo.setPassword(passcode); 780 connectionInfo.setTransportContext(command.getTransportContext()); 781 782 sendToActiveMQ(connectionInfo, new ResponseHandler() { 783 @Override 784 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 785 786 if (response.isException()) { 787 // If the connection attempt fails we close the socket. 788 Throwable exception = ((ExceptionResponse)response).getException(); 789 handleException(exception, command); 790 getStompTransport().onException(IOExceptionSupport.create(exception)); 791 return; 792 } 793 794 final SessionInfo sessionInfo = new SessionInfo(sessionId); 795 sendToActiveMQ(sessionInfo, null); 796 797 final ProducerInfo producerInfo = new ProducerInfo(producerId); 798 sendToActiveMQ(producerInfo, new ResponseHandler() { 799 @Override 800 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 801 802 if (response.isException()) { 803 // If the connection attempt fails we close the socket. 804 Throwable exception = ((ExceptionResponse)response).getException(); 805 handleException(exception, command); 806 getStompTransport().onException(IOExceptionSupport.create(exception)); 807 } 808 809 connected.set(true); 810 HashMap<String, String> responseHeaders = new HashMap<String, String>(); 811 812 responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId()); 813 String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID); 814 if (requestId == null) { 815 // TODO legacy 816 requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED); 817 } 818 if (requestId != null) { 819 // TODO legacy 820 responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId); 821 responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId); 822 } 823 824 responseHeaders.put(Stomp.Headers.Connected.VERSION, version); 825 responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT, 826 String.format("%d,%d", hbWriteInterval, hbReadInterval)); 827 responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION); 828 829 StompFrame sc = new StompFrame(); 830 sc.setAction(Stomp.Responses.CONNECTED); 831 sc.setHeaders(responseHeaders); 832 sendToStomp(sc); 833 834 StompWireFormat format = stompTransport.getWireFormat(); 835 if (format != null) { 836 format.setStompVersion(version); 837 } 838 } 839 }); 840 } 841 }); 842 } 843 844 protected void onStompDisconnect(StompFrame command) throws ProtocolException { 845 if (connected.get()) { 846 sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command)); 847 sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); 848 connected.set(false); 849 } 850 } 851 852 protected void checkConnected() throws ProtocolException { 853 if (!connected.get()) { 854 throw new ProtocolException("Not connected."); 855 } 856 } 857 858 /** 859 * Dispatch a ActiveMQ command 860 * 861 * @param command 862 * @throws IOException 863 */ 864 public void onActiveMQCommand(Command command) throws IOException, JMSException { 865 if (command.isResponse()) { 866 Response response = (Response)command; 867 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); 868 if (rh != null) { 869 rh.onResponse(this, response); 870 } else { 871 // Pass down any unexpected errors. Should this close the connection? 872 if (response.isException()) { 873 Throwable exception = ((ExceptionResponse)response).getException(); 874 handleException(exception, null); 875 } 876 } 877 } else if (command.isMessageDispatch()) { 878 MessageDispatch md = (MessageDispatch)command; 879 StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); 880 if (sub != null) { 881 String ackId = null; 882 if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO && md.getMessage() != null) { 883 AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub); 884 ackId = this.ACK_ID_GENERATOR.generateId(); 885 this.pedingAcks.put(ackId, pendingAck); 886 } 887 try { 888 sub.onMessageDispatch(md, ackId); 889 } catch (Exception ex) { 890 if (ackId != null) { 891 this.pedingAcks.remove(ackId); 892 } 893 } 894 } 895 } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) { 896 stompTransport.sendToStomp(ping); 897 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { 898 // Pass down any unexpected async errors. Should this close the connection? 899 Throwable exception = ((ConnectionError)command).getException(); 900 handleException(exception, null); 901 } 902 } 903 904 public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException { 905 ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command); 906 return msg; 907 } 908 909 public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException { 910 if (ignoreTransformation == true) { 911 return frameTranslator.convertMessage(this, message); 912 } else { 913 FrameTranslator translator = findTranslator( 914 message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination(), message.isAdvisory()); 915 return translator.convertMessage(this, message); 916 } 917 } 918 919 public StompTransport getStompTransport() { 920 return stompTransport; 921 } 922 923 public ActiveMQDestination createTempDestination(String name, boolean topic) { 924 ActiveMQDestination rc = tempDestinations.get(name); 925 if( rc == null ) { 926 if (topic) { 927 rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId()); 928 } else { 929 rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId()); 930 } 931 sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); 932 tempDestinations.put(name, rc); 933 tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name); 934 } 935 return rc; 936 } 937 938 public String getCreatedTempDestinationName(ActiveMQDestination destination) { 939 return tempDestinationAmqToStompMap.get(destination.getQualifiedName()); 940 } 941 942 public String getDefaultHeartBeat() { 943 return defaultHeartBeat; 944 } 945 946 public void setDefaultHeartBeat(String defaultHeartBeat) { 947 this.defaultHeartBeat = defaultHeartBeat; 948 } 949 950 /** 951 * @return the hbGracePeriodMultiplier 952 */ 953 public float getHbGracePeriodMultiplier() { 954 return hbGracePeriodMultiplier; 955 } 956 957 /** 958 * @param hbGracePeriodMultiplier the hbGracePeriodMultiplier to set 959 */ 960 public void setHbGracePeriodMultiplier(float hbGracePeriodMultiplier) { 961 this.hbGracePeriodMultiplier = hbGracePeriodMultiplier; 962 } 963 964 protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException { 965 966 String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA); 967 968 if (keepAliveOpts == null || keepAliveOpts.length != 2) { 969 throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true); 970 } else { 971 972 try { 973 hbReadInterval = (Long.parseLong(keepAliveOpts[0])); 974 hbWriteInterval = Long.parseLong(keepAliveOpts[1]); 975 } catch(NumberFormatException e) { 976 throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true); 977 } 978 979 try { 980 StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor(); 981 monitor.setReadCheckTime((long) (hbReadInterval * hbGracePeriodMultiplier)); 982 monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval)); 983 monitor.setWriteCheckTime(hbWriteInterval); 984 monitor.startMonitoring(); 985 } catch(Exception ex) { 986 hbReadInterval = 0; 987 hbWriteInterval = 0; 988 } 989 990 if (LOG.isDebugEnabled()) { 991 LOG.debug("Stomp Connect heartbeat conf RW[{},{}]", hbReadInterval, hbWriteInterval); 992 } 993 } 994 } 995 996 protected void sendReceipt(StompFrame command) { 997 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 998 if (receiptId != null) { 999 StompFrame sc = new StompFrame(); 1000 sc.setAction(Stomp.Responses.RECEIPT); 1001 sc.setHeaders(new HashMap<String, String>(1)); 1002 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 1003 try { 1004 sendToStomp(sc); 1005 } catch (IOException e) { 1006 LOG.warn("Could not send a receipt for {}", command, e); 1007 } 1008 } 1009 } 1010 1011 /** 1012 * Retrieve the STOMP action value from a frame if the value is valid, otherwise 1013 * return an unknown string to allow for safe log output. 1014 * 1015 * @param command 1016 * The STOMP command to fetch an action from. 1017 * 1018 * @return the command action or a safe string to use in logging. 1019 */ 1020 protected Object safeGetAction(StompFrame command) { 1021 String result = "<Unknown>"; 1022 if (command != null && command.getAction() != null) { 1023 String action = command.getAction().trim(); 1024 1025 if (action != null) { 1026 switch (action) { 1027 case Stomp.Commands.SEND: 1028 case Stomp.Commands.ACK: 1029 case Stomp.Commands.NACK: 1030 case Stomp.Commands.BEGIN: 1031 case Stomp.Commands.COMMIT: 1032 case Stomp.Commands.ABORT: 1033 case Stomp.Commands.SUBSCRIBE: 1034 case Stomp.Commands.UNSUBSCRIBE: 1035 case Stomp.Commands.CONNECT: 1036 case Stomp.Commands.STOMP: 1037 case Stomp.Commands.DISCONNECT: 1038 result = action; 1039 break; 1040 default: 1041 break; 1042 } 1043 } 1044 } 1045 1046 return result; 1047 } 1048}