001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import java.io.BufferedReader;
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InputStreamReader;
023import java.io.OutputStreamWriter;
024import java.io.PrintWriter;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.Map;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030import java.util.concurrent.atomic.AtomicBoolean;
031
032import javax.jms.JMSException;
033
034import org.apache.activemq.ActiveMQPrefetchPolicy;
035import org.apache.activemq.advisory.AdvisorySupport;
036import org.apache.activemq.broker.BrokerContext;
037import org.apache.activemq.broker.BrokerContextAware;
038import org.apache.activemq.command.ActiveMQDestination;
039import org.apache.activemq.command.ActiveMQMessage;
040import org.apache.activemq.command.ActiveMQTempQueue;
041import org.apache.activemq.command.ActiveMQTempTopic;
042import org.apache.activemq.command.Command;
043import org.apache.activemq.command.CommandTypes;
044import org.apache.activemq.command.ConnectionError;
045import org.apache.activemq.command.ConnectionId;
046import org.apache.activemq.command.ConnectionInfo;
047import org.apache.activemq.command.ConsumerControl;
048import org.apache.activemq.command.ConsumerId;
049import org.apache.activemq.command.ConsumerInfo;
050import org.apache.activemq.command.DestinationInfo;
051import org.apache.activemq.command.ExceptionResponse;
052import org.apache.activemq.command.LocalTransactionId;
053import org.apache.activemq.command.MessageAck;
054import org.apache.activemq.command.MessageDispatch;
055import org.apache.activemq.command.MessageId;
056import org.apache.activemq.command.ProducerId;
057import org.apache.activemq.command.ProducerInfo;
058import org.apache.activemq.command.RemoveSubscriptionInfo;
059import org.apache.activemq.command.Response;
060import org.apache.activemq.command.SessionId;
061import org.apache.activemq.command.SessionInfo;
062import org.apache.activemq.command.ShutdownInfo;
063import org.apache.activemq.command.TransactionId;
064import org.apache.activemq.command.TransactionInfo;
065import org.apache.activemq.util.ByteArrayOutputStream;
066import org.apache.activemq.util.FactoryFinder;
067import org.apache.activemq.util.IOExceptionSupport;
068import org.apache.activemq.util.IdGenerator;
069import org.apache.activemq.util.IntrospectionSupport;
070import org.apache.activemq.util.LongSequenceGenerator;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074/**
075 * @author <a href="http://hiramchirino.com">chirino</a>
076 */
077public class ProtocolConverter {
078
079    private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
080
081    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
082
083    private static final String BROKER_VERSION;
084    private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
085
086    static {
087        String version = "5.6.0";
088        try(InputStream in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) {
089            if (in != null) {
090                try(InputStreamReader isr = new InputStreamReader(in);
091                    BufferedReader reader = new BufferedReader(isr)) {
092                    version = reader.readLine();
093                }
094            }
095        }catch(Exception e) {
096        }
097
098        BROKER_VERSION = version;
099    }
100
101    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
102    private final SessionId sessionId = new SessionId(connectionId, -1);
103    private final ProducerId producerId = new ProducerId(sessionId, 1);
104
105    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
106    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
107    private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
108    private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
109
110    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
111    private final ConcurrentMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
112    private final ConcurrentMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>();
113    private final ConcurrentMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
114    private final ConcurrentMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
115    private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
116    private final StompTransport stompTransport;
117
118    private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<String, AckEntry>();
119    private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
120
121    private final Object commnadIdMutex = new Object();
122    private int lastCommandId;
123    private final AtomicBoolean connected = new AtomicBoolean(false);
124    private final FrameTranslator frameTranslator = new LegacyFrameTranslator();
125    private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
126    private final BrokerContext brokerContext;
127    private String version = "1.0";
128    private long hbReadInterval;
129    private long hbWriteInterval;
130    private float hbGracePeriodMultiplier = 1.0f;
131    private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
132
133    private static class AckEntry {
134
135        private final String messageId;
136        private final StompSubscription subscription;
137
138        public AckEntry(String messageId, StompSubscription subscription) {
139            this.messageId = messageId;
140            this.subscription = subscription;
141        }
142
143        public MessageAck onMessageAck(TransactionId transactionId) {
144            return subscription.onStompMessageAck(messageId, transactionId);
145        }
146
147        public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
148            return subscription.onStompMessageNack(messageId, transactionId);
149        }
150
151        public String getMessageId() {
152            return this.messageId;
153        }
154
155        @SuppressWarnings("unused")
156        public StompSubscription getSubscription() {
157            return this.subscription;
158        }
159    }
160
161    public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
162        this.stompTransport = stompTransport;
163        this.brokerContext = brokerContext;
164    }
165
166    protected int generateCommandId() {
167        synchronized (commnadIdMutex) {
168            return lastCommandId++;
169        }
170    }
171
172    protected ResponseHandler createResponseHandler(final StompFrame command) {
173        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
174        if (receiptId != null) {
175            return new ResponseHandler() {
176                @Override
177                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
178                    if (response.isException()) {
179                        // Generally a command can fail.. but that does not invalidate the connection.
180                        // We report back the failure but we don't close the connection.
181                        Throwable exception = ((ExceptionResponse)response).getException();
182                        handleException(exception, command);
183                    } else {
184                        StompFrame sc = new StompFrame();
185                        sc.setAction(Stomp.Responses.RECEIPT);
186                        sc.setHeaders(new HashMap<String, String>(1));
187                        sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
188                        stompTransport.sendToStomp(sc);
189                    }
190                }
191            };
192        }
193        return null;
194    }
195
196    protected void sendToActiveMQ(Command command, ResponseHandler handler) {
197        command.setCommandId(generateCommandId());
198        if (handler != null) {
199            command.setResponseRequired(true);
200            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
201        }
202        stompTransport.sendToActiveMQ(command);
203    }
204
205    protected void sendToStomp(StompFrame command) throws IOException {
206        stompTransport.sendToStomp(command);
207    }
208
209    protected FrameTranslator findTranslator(String header) {
210        return findTranslator(header, null, false);
211    }
212
213    protected FrameTranslator findTranslator(String header, ActiveMQDestination destination, boolean advisory) {
214        FrameTranslator translator = frameTranslator;
215        try {
216            if (header != null) {
217                translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header);
218            } else {
219                if (destination != null && (advisory || AdvisorySupport.isAdvisoryTopic(destination))) {
220                    translator = new JmsFrameTranslator();
221                }
222            }
223        } catch (Exception ignore) {
224            // if anything goes wrong use the default translator
225        }
226
227        if (translator instanceof BrokerContextAware) {
228            ((BrokerContextAware)translator).setBrokerContext(brokerContext);
229        }
230
231        return translator;
232    }
233
234    /**
235     * Convert a STOMP command
236     *
237     * @param command
238     */
239    public void onStompCommand(StompFrame command) throws IOException, JMSException {
240        try {
241
242            if (command.getClass() == StompFrameError.class) {
243                throw ((StompFrameError)command).getException();
244            }
245
246            String action = command.getAction();
247            if (action.startsWith(Stomp.Commands.SEND)) {
248                onStompSend(command);
249            } else if (action.startsWith(Stomp.Commands.ACK)) {
250                onStompAck(command);
251            } else if (action.startsWith(Stomp.Commands.NACK)) {
252                onStompNack(command);
253            } else if (action.startsWith(Stomp.Commands.BEGIN)) {
254                onStompBegin(command);
255            } else if (action.startsWith(Stomp.Commands.COMMIT)) {
256                onStompCommit(command);
257            } else if (action.startsWith(Stomp.Commands.ABORT)) {
258                onStompAbort(command);
259            } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
260                onStompSubscribe(command);
261            } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
262                onStompUnsubscribe(command);
263            } else if (action.startsWith(Stomp.Commands.CONNECT) ||
264                       action.startsWith(Stomp.Commands.STOMP)) {
265                onStompConnect(command);
266            } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
267                onStompDisconnect(command);
268            } else {
269                throw new ProtocolException("Unknown STOMP action: " + action, true);
270            }
271
272        } catch (ProtocolException e) {
273            handleException(e, command);
274            // Some protocol errors can cause the connection to get closed.
275            if (e.isFatal()) {
276               getStompTransport().onException(e);
277            }
278        }
279    }
280
281    protected void handleException(Throwable exception, StompFrame command) throws IOException {
282        if (command == null) {
283            LOG.warn("Exception occurred while processing a command: {}", exception.toString());
284        } else {
285            LOG.warn("Exception occurred processing: {} -> {}", safeGetAction(command), exception.toString());
286        }
287
288        if (LOG.isDebugEnabled()) {
289            LOG.debug("Exception detail", exception);
290        }
291
292        if (command != null && LOG.isTraceEnabled()) {
293            LOG.trace("Command that caused the error: {}", command);
294        }
295
296        // Let the stomp client know about any protocol errors.
297        ByteArrayOutputStream baos = new ByteArrayOutputStream();
298        PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
299        if (exception instanceof SecurityException || exception.getCause() instanceof SecurityException) {
300            stream.write(exception.getLocalizedMessage());
301        } else {
302            exception.printStackTrace(stream);
303        }
304        stream.close();
305
306        HashMap<String, String> headers = new HashMap<String, String>();
307        headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
308        headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
309
310        if (command != null) {
311            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
312            if (receiptId != null) {
313                headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
314            }
315        }
316
317        StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
318        sendToStomp(errorMessage);
319    }
320
321    protected void onStompSend(StompFrame command) throws IOException, JMSException {
322        checkConnected();
323
324        Map<String, String> headers = command.getHeaders();
325        String destination = headers.get(Stomp.Headers.Send.DESTINATION);
326        if (destination == null) {
327            throw new ProtocolException("SEND received without a Destination specified!");
328        }
329
330        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
331        headers.remove("transaction");
332
333        ActiveMQMessage message = convertMessage(command);
334
335        message.setProducerId(producerId);
336        MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
337        message.setMessageId(id);
338
339        if (stompTx != null) {
340            TransactionId activemqTx = transactions.get(stompTx);
341            if (activemqTx == null) {
342                throw new ProtocolException("Invalid transaction id: " + stompTx);
343            }
344            message.setTransactionId(activemqTx);
345        }
346
347        message.onSend();
348        message.beforeMarshall(null);
349        sendToActiveMQ(message, createResponseHandler(command));
350    }
351
352    protected void onStompNack(StompFrame command) throws ProtocolException {
353
354        checkConnected();
355
356        if (this.version.equals(Stomp.V1_0)) {
357            throw new ProtocolException("NACK received but connection is in v1.0 mode.");
358        }
359
360        Map<String, String> headers = command.getHeaders();
361
362        String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
363        if (subscriptionId == null && !this.version.equals(Stomp.V1_2)) {
364            throw new ProtocolException("NACK received without a subscription id for acknowledge!");
365        }
366
367        String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
368        if (messageId == null && !this.version.equals(Stomp.V1_2)) {
369            throw new ProtocolException("NACK received without a message-id to acknowledge!");
370        }
371
372        String ackId = headers.get(Stomp.Headers.Ack.ACK_ID);
373        if (ackId == null && this.version.equals(Stomp.V1_2)) {
374            throw new ProtocolException("NACK received without an ack header to acknowledge!");
375        }
376
377        TransactionId activemqTx = null;
378        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
379        if (stompTx != null) {
380            activemqTx = transactions.get(stompTx);
381            if (activemqTx == null) {
382                throw new ProtocolException("Invalid transaction id: " + stompTx);
383            }
384        }
385
386        boolean nacked = false;
387
388        if (ackId != null) {
389            AckEntry pendingAck = this.pedingAcks.remove(ackId);
390            if (pendingAck != null) {
391                messageId = pendingAck.getMessageId();
392                MessageAck ack = pendingAck.onMessageNack(activemqTx);
393                if (ack != null) {
394                    sendToActiveMQ(ack, createResponseHandler(command));
395                    nacked = true;
396                }
397            }
398        } else if (subscriptionId != null) {
399            StompSubscription sub = this.subscriptions.get(subscriptionId);
400            if (sub != null) {
401                MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
402                if (ack != null) {
403                    sendToActiveMQ(ack, createResponseHandler(command));
404                    nacked = true;
405                }
406            }
407        }
408
409        if (!nacked) {
410            throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
411        }
412    }
413
414    protected void onStompAck(StompFrame command) throws ProtocolException {
415        checkConnected();
416
417        Map<String, String> headers = command.getHeaders();
418        String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
419        if (messageId == null && !(this.version.equals(Stomp.V1_2))) {
420            throw new ProtocolException("ACK received without a message-id to acknowledge!");
421        }
422
423        String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
424        if (subscriptionId == null && this.version.equals(Stomp.V1_1)) {
425            throw new ProtocolException("ACK received without a subscription id for acknowledge!");
426        }
427
428        String ackId = headers.get(Stomp.Headers.Ack.ACK_ID);
429        if (ackId == null && this.version.equals(Stomp.V1_2)) {
430            throw new ProtocolException("ACK received without a ack id for acknowledge!");
431        }
432
433        TransactionId activemqTx = null;
434        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
435        if (stompTx != null) {
436            activemqTx = transactions.get(stompTx);
437            if (activemqTx == null) {
438                throw new ProtocolException("Invalid transaction id: " + stompTx);
439            }
440        }
441
442        boolean acked = false;
443
444        if (ackId != null) {
445            AckEntry pendingAck = this.pedingAcks.remove(ackId);
446            if (pendingAck != null) {
447                messageId = pendingAck.getMessageId();
448                MessageAck ack = pendingAck.onMessageAck(activemqTx);
449                if (ack != null) {
450                    sendToActiveMQ(ack, createResponseHandler(command));
451                    acked = true;
452                }
453            }
454
455        } else if (subscriptionId != null) {
456            StompSubscription sub = this.subscriptions.get(subscriptionId);
457            if (sub != null) {
458                MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
459                if (ack != null) {
460                    sendToActiveMQ(ack, createResponseHandler(command));
461                    acked = true;
462                }
463            }
464        } else {
465            // STOMP v1.0: acking with just a message id is very bogus since the same message id
466            // could have been sent to 2 different subscriptions on the same Stomp connection.
467            // For example, when 2 subs are created on the same topic.
468            for (StompSubscription sub : subscriptionsByConsumerId.values()) {
469                MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
470                if (ack != null) {
471                    sendToActiveMQ(ack, createResponseHandler(command));
472                    acked = true;
473                    break;
474                }
475            }
476        }
477
478        if (!acked) {
479            throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
480        }
481    }
482
483    protected void onStompBegin(StompFrame command) throws ProtocolException {
484        checkConnected();
485
486        Map<String, String> headers = command.getHeaders();
487
488        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
489
490        if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
491            throw new ProtocolException("Must specify the transaction you are beginning");
492        }
493
494        if (transactions.get(stompTx) != null) {
495            throw new ProtocolException("The transaction was already started: " + stompTx);
496        }
497
498        LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
499        transactions.put(stompTx, activemqTx);
500
501        TransactionInfo tx = new TransactionInfo();
502        tx.setConnectionId(connectionId);
503        tx.setTransactionId(activemqTx);
504        tx.setType(TransactionInfo.BEGIN);
505
506        sendToActiveMQ(tx, createResponseHandler(command));
507    }
508
509    protected void onStompCommit(StompFrame command) throws ProtocolException {
510        checkConnected();
511
512        Map<String, String> headers = command.getHeaders();
513
514        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
515        if (stompTx == null) {
516            throw new ProtocolException("Must specify the transaction you are committing");
517        }
518
519        TransactionId activemqTx = transactions.remove(stompTx);
520        if (activemqTx == null) {
521            throw new ProtocolException("Invalid transaction id: " + stompTx);
522        }
523
524        for (StompSubscription sub : subscriptionsByConsumerId.values()) {
525            sub.onStompCommit(activemqTx);
526        }
527
528        pedingAcks.clear();
529
530        TransactionInfo tx = new TransactionInfo();
531        tx.setConnectionId(connectionId);
532        tx.setTransactionId(activemqTx);
533        tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
534
535        sendToActiveMQ(tx, createResponseHandler(command));
536    }
537
538    protected void onStompAbort(StompFrame command) throws ProtocolException {
539        checkConnected();
540        Map<String, String> headers = command.getHeaders();
541
542        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
543        if (stompTx == null) {
544            throw new ProtocolException("Must specify the transaction you are committing");
545        }
546
547        TransactionId activemqTx = transactions.remove(stompTx);
548        if (activemqTx == null) {
549            throw new ProtocolException("Invalid transaction id: " + stompTx);
550        }
551        for (StompSubscription sub : subscriptionsByConsumerId.values()) {
552            try {
553                sub.onStompAbort(activemqTx);
554            } catch (Exception e) {
555                throw new ProtocolException("Transaction abort failed", false, e);
556            }
557        }
558
559        pedingAcks.clear();
560
561        TransactionInfo tx = new TransactionInfo();
562        tx.setConnectionId(connectionId);
563        tx.setTransactionId(activemqTx);
564        tx.setType(TransactionInfo.ROLLBACK);
565
566        sendToActiveMQ(tx, createResponseHandler(command));
567    }
568
569    protected void onStompSubscribe(StompFrame command) throws ProtocolException {
570        checkConnected();
571        FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
572        Map<String, String> headers = command.getHeaders();
573
574        String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
575        String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
576
577        if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) {
578            throw new ProtocolException("SUBSCRIBE received without a subscription id!");
579        }
580
581        if (destination == null || "".equals(destination)) {
582            throw new ProtocolException("Invalid empty or 'null' Destination header");
583        }
584
585        final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
586
587        if (actualDest == null) {
588            throw new ProtocolException("Invalid 'null' Destination.");
589        }
590
591        final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
592        ConsumerInfo consumerInfo = new ConsumerInfo(id);
593        consumerInfo.setPrefetchSize(actualDest.isQueue() ?
594                ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH :
595                headers.containsKey("activemq.subscriptionName") ?
596                        ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH : ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH);
597        consumerInfo.setDispatchAsync(true);
598
599        String browser = headers.get(Stomp.Headers.Subscribe.BROWSER);
600        if (browser != null && browser.equals(Stomp.TRUE)) {
601
602            if (this.version.equals(Stomp.V1_0)) {
603                throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1+ clients!");
604            }
605
606            consumerInfo.setBrowser(true);
607            consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH);
608        }
609
610        String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
611        if (selector != null) {
612            consumerInfo.setSelector("convert_string_expressions:" + selector);
613        }
614
615        IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
616
617        if (actualDest.isQueue() && consumerInfo.getSubscriptionName() != null) {
618            throw new ProtocolException("Invalid Subscription: cannot durably subscribe to a Queue destination!");
619        }
620
621        consumerInfo.setDestination(actualDest);
622
623        StompSubscription stompSubscription;
624        if (!consumerInfo.isBrowser()) {
625            stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
626        } else {
627            stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
628        }
629        stompSubscription.setDestination(actualDest);
630
631        String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
632        if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
633            stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
634        } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
635            stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
636        } else {
637            stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
638        }
639
640        subscriptionsByConsumerId.put(id, stompSubscription);
641        // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set.
642        if (subscriptionId != null) {
643            subscriptions.put(subscriptionId, stompSubscription);
644        }
645
646        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
647        if (receiptId != null && consumerInfo.getPrefetchSize() > 0) {
648
649            final StompFrame cmd = command;
650            final int prefetch = consumerInfo.getPrefetchSize();
651
652            // Since dispatch could beat the receipt we set prefetch to zero to start and then
653            // once we've sent our Receipt we are safe to turn on dispatch if the response isn't
654            // an error message.
655            consumerInfo.setPrefetchSize(0);
656
657            final ResponseHandler handler = new ResponseHandler() {
658                @Override
659                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
660                    if (response.isException()) {
661                        // Generally a command can fail.. but that does not invalidate the connection.
662                        // We report back the failure but we don't close the connection.
663                        Throwable exception = ((ExceptionResponse)response).getException();
664                        handleException(exception, cmd);
665                    } else {
666                        StompFrame sc = new StompFrame();
667                        sc.setAction(Stomp.Responses.RECEIPT);
668                        sc.setHeaders(new HashMap<String, String>(1));
669                        sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
670                        stompTransport.sendToStomp(sc);
671
672                        ConsumerControl control = new ConsumerControl();
673                        control.setPrefetch(prefetch);
674                        control.setDestination(actualDest);
675                        control.setConsumerId(id);
676
677                        sendToActiveMQ(control, null);
678                    }
679                }
680            };
681
682            sendToActiveMQ(consumerInfo, handler);
683        } else {
684            sendToActiveMQ(consumerInfo, createResponseHandler(command));
685        }
686    }
687
688    protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
689        checkConnected();
690        Map<String, String> headers = command.getHeaders();
691
692        ActiveMQDestination destination = null;
693        Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
694        if (o != null) {
695            destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true);
696        }
697
698        String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
699        if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) {
700            throw new ProtocolException("UNSUBSCRIBE received without a subscription id!");
701        }
702
703        if (subscriptionId == null && destination == null) {
704            throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
705        }
706
707        // check if it is a durable subscription
708        String durable = command.getHeaders().get("activemq.subscriptionName");
709        String clientId = durable;
710        if (!this.version.equals(Stomp.V1_0)) {
711            clientId = connectionInfo.getClientId();
712        }
713
714        if (durable != null) {
715            RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
716            info.setClientId(clientId);
717            info.setSubscriptionName(durable);
718            info.setConnectionId(connectionId);
719            sendToActiveMQ(info, createResponseHandler(command));
720            return;
721        }
722
723        if (subscriptionId != null) {
724            StompSubscription sub = this.subscriptions.remove(subscriptionId);
725            if (sub != null) {
726                sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
727                return;
728            }
729        } else {
730            // Unsubscribing using a destination is a bit weird if multiple subscriptions
731            // are created with the same destination.
732            for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
733                StompSubscription sub = iter.next();
734                if (destination != null && destination.equals(sub.getDestination())) {
735                    sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
736                    iter.remove();
737                    return;
738                }
739            }
740        }
741
742        throw new ProtocolException("No subscription matched.");
743    }
744
745    ConnectionInfo connectionInfo = new ConnectionInfo();
746
747    protected void onStompConnect(final StompFrame command) throws ProtocolException {
748
749        if (connected.get()) {
750            throw new ProtocolException("Already connected.");
751        }
752
753        final Map<String, String> headers = command.getHeaders();
754
755        // allow anyone to login for now
756        String login = headers.get(Stomp.Headers.Connect.LOGIN);
757        String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
758        String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
759        String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
760
761        if (heartBeat == null) {
762            heartBeat = defaultHeartBeat;
763        }
764
765        this.version = StompCodec.detectVersion(headers);
766
767        configureInactivityMonitor(heartBeat.trim());
768
769        IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
770        connectionInfo.setConnectionId(connectionId);
771        if (clientId != null) {
772            connectionInfo.setClientId(clientId);
773        } else {
774            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
775        }
776
777        connectionInfo.setResponseRequired(true);
778        connectionInfo.setUserName(login);
779        connectionInfo.setPassword(passcode);
780        connectionInfo.setTransportContext(command.getTransportContext());
781
782        sendToActiveMQ(connectionInfo, new ResponseHandler() {
783            @Override
784            public void onResponse(ProtocolConverter converter, Response response) throws IOException {
785
786                if (response.isException()) {
787                    // If the connection attempt fails we close the socket.
788                    Throwable exception = ((ExceptionResponse)response).getException();
789                    handleException(exception, command);
790                    getStompTransport().onException(IOExceptionSupport.create(exception));
791                    return;
792                }
793
794                final SessionInfo sessionInfo = new SessionInfo(sessionId);
795                sendToActiveMQ(sessionInfo, null);
796
797                final ProducerInfo producerInfo = new ProducerInfo(producerId);
798                sendToActiveMQ(producerInfo, new ResponseHandler() {
799                    @Override
800                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
801
802                        if (response.isException()) {
803                            // If the connection attempt fails we close the socket.
804                            Throwable exception = ((ExceptionResponse)response).getException();
805                            handleException(exception, command);
806                            getStompTransport().onException(IOExceptionSupport.create(exception));
807                        }
808
809                        connected.set(true);
810                        HashMap<String, String> responseHeaders = new HashMap<String, String>();
811
812                        responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
813                        String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
814                        if (requestId == null) {
815                            // TODO legacy
816                            requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
817                        }
818                        if (requestId != null) {
819                            // TODO legacy
820                            responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
821                            responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
822                        }
823
824                        responseHeaders.put(Stomp.Headers.Connected.VERSION, version);
825                        responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT,
826                                            String.format("%d,%d", hbWriteInterval, hbReadInterval));
827                        responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION);
828
829                        StompFrame sc = new StompFrame();
830                        sc.setAction(Stomp.Responses.CONNECTED);
831                        sc.setHeaders(responseHeaders);
832                        sendToStomp(sc);
833
834                        StompWireFormat format = stompTransport.getWireFormat();
835                        if (format != null) {
836                            format.setStompVersion(version);
837                        }
838                    }
839                });
840            }
841        });
842    }
843
844    protected void onStompDisconnect(StompFrame command) throws ProtocolException {
845        if (connected.get()) {
846            sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
847            sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
848            connected.set(false);
849        }
850    }
851
852    protected void checkConnected() throws ProtocolException {
853        if (!connected.get()) {
854            throw new ProtocolException("Not connected.");
855        }
856    }
857
858    /**
859     * Dispatch a ActiveMQ command
860     *
861     * @param command
862     * @throws IOException
863     */
864    public void onActiveMQCommand(Command command) throws IOException, JMSException {
865        if (command.isResponse()) {
866            Response response = (Response)command;
867            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
868            if (rh != null) {
869                rh.onResponse(this, response);
870            } else {
871                // Pass down any unexpected errors. Should this close the connection?
872                if (response.isException()) {
873                    Throwable exception = ((ExceptionResponse)response).getException();
874                    handleException(exception, null);
875                }
876            }
877        } else if (command.isMessageDispatch()) {
878            MessageDispatch md = (MessageDispatch)command;
879            StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
880            if (sub != null) {
881                String ackId = null;
882                if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO && md.getMessage() != null) {
883                    AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub);
884                    ackId = this.ACK_ID_GENERATOR.generateId();
885                    this.pedingAcks.put(ackId, pendingAck);
886                }
887                try {
888                    sub.onMessageDispatch(md, ackId);
889                } catch (Exception ex) {
890                    if (ackId != null) {
891                        this.pedingAcks.remove(ackId);
892                    }
893                }
894            }
895        } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
896            stompTransport.sendToStomp(ping);
897        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
898            // Pass down any unexpected async errors. Should this close the connection?
899            Throwable exception = ((ConnectionError)command).getException();
900            handleException(exception, null);
901        }
902    }
903
904    public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
905        ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
906        return msg;
907    }
908
909    public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
910        if (ignoreTransformation == true) {
911            return frameTranslator.convertMessage(this, message);
912        } else {
913            FrameTranslator translator = findTranslator(
914                message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination(), message.isAdvisory());
915            return translator.convertMessage(this, message);
916        }
917    }
918
919    public StompTransport getStompTransport() {
920        return stompTransport;
921    }
922
923    public ActiveMQDestination createTempDestination(String name, boolean topic) {
924        ActiveMQDestination rc = tempDestinations.get(name);
925        if( rc == null ) {
926            if (topic) {
927                rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
928            } else {
929                rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
930            }
931            sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
932            tempDestinations.put(name, rc);
933            tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
934        }
935        return rc;
936    }
937
938    public String getCreatedTempDestinationName(ActiveMQDestination destination) {
939        return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
940    }
941
942    public String getDefaultHeartBeat() {
943        return defaultHeartBeat;
944    }
945
946    public void setDefaultHeartBeat(String defaultHeartBeat) {
947        this.defaultHeartBeat = defaultHeartBeat;
948    }
949
950    /**
951     * @return the hbGracePeriodMultiplier
952     */
953    public float getHbGracePeriodMultiplier() {
954        return hbGracePeriodMultiplier;
955    }
956
957    /**
958     * @param hbGracePeriodMultiplier the hbGracePeriodMultiplier to set
959     */
960    public void setHbGracePeriodMultiplier(float hbGracePeriodMultiplier) {
961        this.hbGracePeriodMultiplier = hbGracePeriodMultiplier;
962    }
963
964    protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException {
965
966        String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
967
968        if (keepAliveOpts == null || keepAliveOpts.length != 2) {
969            throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
970        } else {
971
972            try {
973                hbReadInterval = (Long.parseLong(keepAliveOpts[0]));
974                hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
975            } catch(NumberFormatException e) {
976                throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
977            }
978
979            try {
980                StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
981                monitor.setReadCheckTime((long) (hbReadInterval * hbGracePeriodMultiplier));
982                monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
983                monitor.setWriteCheckTime(hbWriteInterval);
984                monitor.startMonitoring();
985            } catch(Exception ex) {
986                hbReadInterval = 0;
987                hbWriteInterval = 0;
988            }
989
990            if (LOG.isDebugEnabled()) {
991                LOG.debug("Stomp Connect heartbeat conf RW[{},{}]", hbReadInterval, hbWriteInterval);
992            }
993        }
994    }
995
996    protected void sendReceipt(StompFrame command) {
997        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
998        if (receiptId != null) {
999            StompFrame sc = new StompFrame();
1000            sc.setAction(Stomp.Responses.RECEIPT);
1001            sc.setHeaders(new HashMap<String, String>(1));
1002            sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
1003            try {
1004                sendToStomp(sc);
1005            } catch (IOException e) {
1006                LOG.warn("Could not send a receipt for {}", command, e);
1007            }
1008        }
1009    }
1010
1011    /**
1012     * Retrieve the STOMP action value from a frame if the value is valid, otherwise
1013     * return an unknown string to allow for safe log output.
1014     *
1015     * @param command
1016     *      The STOMP command to fetch an action from.
1017     *
1018     * @return the command action or a safe string to use in logging.
1019     */
1020    protected Object safeGetAction(StompFrame command) {
1021        String result = "<Unknown>";
1022        if (command != null && command.getAction() != null) {
1023            String action = command.getAction().trim();
1024
1025            if (action != null) {
1026                switch (action) {
1027                    case Stomp.Commands.SEND:
1028                    case Stomp.Commands.ACK:
1029                    case Stomp.Commands.NACK:
1030                    case Stomp.Commands.BEGIN:
1031                    case Stomp.Commands.COMMIT:
1032                    case Stomp.Commands.ABORT:
1033                    case Stomp.Commands.SUBSCRIBE:
1034                    case Stomp.Commands.UNSUBSCRIBE:
1035                    case Stomp.Commands.CONNECT:
1036                    case Stomp.Commands.STOMP:
1037                    case Stomp.Commands.DISCONNECT:
1038                        result = action;
1039                        break;
1040                    default:
1041                        break;
1042                }
1043            }
1044        }
1045
1046        return result;
1047    }
1048}