001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import java.io.BufferedReader;
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InputStreamReader;
023import java.io.OutputStreamWriter;
024import java.io.PrintWriter;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.Map;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030import java.util.concurrent.atomic.AtomicBoolean;
031
032import javax.jms.JMSException;
033
034import org.apache.activemq.ActiveMQPrefetchPolicy;
035import org.apache.activemq.advisory.AdvisorySupport;
036import org.apache.activemq.broker.BrokerContext;
037import org.apache.activemq.broker.BrokerContextAware;
038import org.apache.activemq.command.ActiveMQDestination;
039import org.apache.activemq.command.ActiveMQMessage;
040import org.apache.activemq.command.ActiveMQTempQueue;
041import org.apache.activemq.command.ActiveMQTempTopic;
042import org.apache.activemq.command.Command;
043import org.apache.activemq.command.CommandTypes;
044import org.apache.activemq.command.ConnectionError;
045import org.apache.activemq.command.ConnectionId;
046import org.apache.activemq.command.ConnectionInfo;
047import org.apache.activemq.command.ConsumerControl;
048import org.apache.activemq.command.ConsumerId;
049import org.apache.activemq.command.ConsumerInfo;
050import org.apache.activemq.command.DestinationInfo;
051import org.apache.activemq.command.ExceptionResponse;
052import org.apache.activemq.command.LocalTransactionId;
053import org.apache.activemq.command.MessageAck;
054import org.apache.activemq.command.MessageDispatch;
055import org.apache.activemq.command.MessageId;
056import org.apache.activemq.command.ProducerId;
057import org.apache.activemq.command.ProducerInfo;
058import org.apache.activemq.command.RemoveSubscriptionInfo;
059import org.apache.activemq.command.Response;
060import org.apache.activemq.command.SessionId;
061import org.apache.activemq.command.SessionInfo;
062import org.apache.activemq.command.ShutdownInfo;
063import org.apache.activemq.command.TransactionId;
064import org.apache.activemq.command.TransactionInfo;
065import org.apache.activemq.util.ByteArrayOutputStream;
066import org.apache.activemq.util.FactoryFinder;
067import org.apache.activemq.util.IOExceptionSupport;
068import org.apache.activemq.util.IdGenerator;
069import org.apache.activemq.util.IntrospectionSupport;
070import org.apache.activemq.util.LongSequenceGenerator;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074/**
075 * @author <a href="http://hiramchirino.com">chirino</a>
076 */
077public class ProtocolConverter {
078
079    private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
080
081    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
082
083    private static final String BROKER_VERSION;
084    private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
085
086    static {
087        InputStream in = null;
088        String version = "5.6.0";
089        if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
090            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
091            try {
092                version = reader.readLine();
093            } catch(Exception e) {
094            }
095        }
096        BROKER_VERSION = version;
097    }
098
099    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
100    private final SessionId sessionId = new SessionId(connectionId, -1);
101    private final ProducerId producerId = new ProducerId(sessionId, 1);
102
103    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
104    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
105    private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
106    private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
107
108    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
109    private final ConcurrentMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
110    private final ConcurrentMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>();
111    private final ConcurrentMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
112    private final ConcurrentMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
113    private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
114    private final StompTransport stompTransport;
115
116    private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<String, AckEntry>();
117    private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
118
119    private final Object commnadIdMutex = new Object();
120    private int lastCommandId;
121    private final AtomicBoolean connected = new AtomicBoolean(false);
122    private final FrameTranslator frameTranslator = new LegacyFrameTranslator();
123    private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
124    private final BrokerContext brokerContext;
125    private String version = "1.0";
126    private long hbReadInterval;
127    private long hbWriteInterval;
128    private float hbGracePeriodMultiplier = 1.0f;
129    private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
130
131    private static class AckEntry {
132
133        private final String messageId;
134        private final StompSubscription subscription;
135
136        public AckEntry(String messageId, StompSubscription subscription) {
137            this.messageId = messageId;
138            this.subscription = subscription;
139        }
140
141        public MessageAck onMessageAck(TransactionId transactionId) {
142            return subscription.onStompMessageAck(messageId, transactionId);
143        }
144
145        public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
146            return subscription.onStompMessageNack(messageId, transactionId);
147        }
148
149        public String getMessageId() {
150            return this.messageId;
151        }
152
153        @SuppressWarnings("unused")
154        public StompSubscription getSubscription() {
155            return this.subscription;
156        }
157    }
158
159    public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
160        this.stompTransport = stompTransport;
161        this.brokerContext = brokerContext;
162    }
163
164    protected int generateCommandId() {
165        synchronized (commnadIdMutex) {
166            return lastCommandId++;
167        }
168    }
169
170    protected ResponseHandler createResponseHandler(final StompFrame command) {
171        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
172        if (receiptId != null) {
173            return new ResponseHandler() {
174                @Override
175                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
176                    if (response.isException()) {
177                        // Generally a command can fail.. but that does not invalidate the connection.
178                        // We report back the failure but we don't close the connection.
179                        Throwable exception = ((ExceptionResponse)response).getException();
180                        handleException(exception, command);
181                    } else {
182                        StompFrame sc = new StompFrame();
183                        sc.setAction(Stomp.Responses.RECEIPT);
184                        sc.setHeaders(new HashMap<String, String>(1));
185                        sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
186                        stompTransport.sendToStomp(sc);
187                    }
188                }
189            };
190        }
191        return null;
192    }
193
194    protected void sendToActiveMQ(Command command, ResponseHandler handler) {
195        command.setCommandId(generateCommandId());
196        if (handler != null) {
197            command.setResponseRequired(true);
198            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
199        }
200        stompTransport.sendToActiveMQ(command);
201    }
202
203    protected void sendToStomp(StompFrame command) throws IOException {
204        stompTransport.sendToStomp(command);
205    }
206
207    protected FrameTranslator findTranslator(String header) {
208        return findTranslator(header, null, false);
209    }
210
211    protected FrameTranslator findTranslator(String header, ActiveMQDestination destination, boolean advisory) {
212        FrameTranslator translator = frameTranslator;
213        try {
214            if (header != null) {
215                translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header);
216            } else {
217                if (destination != null && (advisory || AdvisorySupport.isAdvisoryTopic(destination))) {
218                    translator = new JmsFrameTranslator();
219                }
220            }
221        } catch (Exception ignore) {
222            // if anything goes wrong use the default translator
223        }
224
225        if (translator instanceof BrokerContextAware) {
226            ((BrokerContextAware)translator).setBrokerContext(brokerContext);
227        }
228
229        return translator;
230    }
231
232    /**
233     * Convert a STOMP command
234     *
235     * @param command
236     */
237    public void onStompCommand(StompFrame command) throws IOException, JMSException {
238        try {
239
240            if (command.getClass() == StompFrameError.class) {
241                throw ((StompFrameError)command).getException();
242            }
243
244            String action = command.getAction();
245            if (action.startsWith(Stomp.Commands.SEND)) {
246                onStompSend(command);
247            } else if (action.startsWith(Stomp.Commands.ACK)) {
248                onStompAck(command);
249            } else if (action.startsWith(Stomp.Commands.NACK)) {
250                onStompNack(command);
251            } else if (action.startsWith(Stomp.Commands.BEGIN)) {
252                onStompBegin(command);
253            } else if (action.startsWith(Stomp.Commands.COMMIT)) {
254                onStompCommit(command);
255            } else if (action.startsWith(Stomp.Commands.ABORT)) {
256                onStompAbort(command);
257            } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
258                onStompSubscribe(command);
259            } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
260                onStompUnsubscribe(command);
261            } else if (action.startsWith(Stomp.Commands.CONNECT) ||
262                       action.startsWith(Stomp.Commands.STOMP)) {
263                onStompConnect(command);
264            } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
265                onStompDisconnect(command);
266            } else {
267                throw new ProtocolException("Unknown STOMP action: " + action, true);
268            }
269
270        } catch (ProtocolException e) {
271            handleException(e, command);
272            // Some protocol errors can cause the connection to get closed.
273            if (e.isFatal()) {
274               getStompTransport().onException(e);
275            }
276        }
277    }
278
279    protected void handleException(Throwable exception, StompFrame command) throws IOException {
280        if (command == null) {
281            LOG.warn("Exception occurred while processing a command: {}", exception.toString());
282        } else {
283            LOG.warn("Exception occurred processing: {} -> {}", safeGetAction(command), exception.toString());
284        }
285
286        if (LOG.isDebugEnabled()) {
287            LOG.debug("Exception detail", exception);
288        }
289
290        if (command != null && LOG.isTraceEnabled()) {
291            LOG.trace("Command that caused the error: {}", command);
292        }
293
294        // Let the stomp client know about any protocol errors.
295        ByteArrayOutputStream baos = new ByteArrayOutputStream();
296        PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
297        exception.printStackTrace(stream);
298        stream.close();
299
300        HashMap<String, String> headers = new HashMap<String, String>();
301        headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
302        headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
303
304        if (command != null) {
305            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
306            if (receiptId != null) {
307                headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
308            }
309        }
310
311        StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
312        sendToStomp(errorMessage);
313    }
314
315    protected void onStompSend(StompFrame command) throws IOException, JMSException {
316        checkConnected();
317
318        Map<String, String> headers = command.getHeaders();
319        String destination = headers.get(Stomp.Headers.Send.DESTINATION);
320        if (destination == null) {
321            throw new ProtocolException("SEND received without a Destination specified!");
322        }
323
324        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
325        headers.remove("transaction");
326
327        ActiveMQMessage message = convertMessage(command);
328
329        message.setProducerId(producerId);
330        MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
331        message.setMessageId(id);
332
333        if (stompTx != null) {
334            TransactionId activemqTx = transactions.get(stompTx);
335            if (activemqTx == null) {
336                throw new ProtocolException("Invalid transaction id: " + stompTx);
337            }
338            message.setTransactionId(activemqTx);
339        }
340
341        message.onSend();
342        message.beforeMarshall(null);
343        sendToActiveMQ(message, createResponseHandler(command));
344    }
345
346    protected void onStompNack(StompFrame command) throws ProtocolException {
347
348        checkConnected();
349
350        if (this.version.equals(Stomp.V1_0)) {
351            throw new ProtocolException("NACK received but connection is in v1.0 mode.");
352        }
353
354        Map<String, String> headers = command.getHeaders();
355
356        String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
357        if (subscriptionId == null && !this.version.equals(Stomp.V1_2)) {
358            throw new ProtocolException("NACK received without a subscription id for acknowledge!");
359        }
360
361        String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
362        if (messageId == null && !this.version.equals(Stomp.V1_2)) {
363            throw new ProtocolException("NACK received without a message-id to acknowledge!");
364        }
365
366        String ackId = headers.get(Stomp.Headers.Ack.ACK_ID);
367        if (ackId == null && this.version.equals(Stomp.V1_2)) {
368            throw new ProtocolException("NACK received without an ack header to acknowledge!");
369        }
370
371        TransactionId activemqTx = null;
372        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
373        if (stompTx != null) {
374            activemqTx = transactions.get(stompTx);
375            if (activemqTx == null) {
376                throw new ProtocolException("Invalid transaction id: " + stompTx);
377            }
378        }
379
380        boolean nacked = false;
381
382        if (ackId != null) {
383            AckEntry pendingAck = this.pedingAcks.remove(ackId);
384            if (pendingAck != null) {
385                messageId = pendingAck.getMessageId();
386                MessageAck ack = pendingAck.onMessageNack(activemqTx);
387                if (ack != null) {
388                    sendToActiveMQ(ack, createResponseHandler(command));
389                    nacked = true;
390                }
391            }
392        } else if (subscriptionId != null) {
393            StompSubscription sub = this.subscriptions.get(subscriptionId);
394            if (sub != null) {
395                MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
396                if (ack != null) {
397                    sendToActiveMQ(ack, createResponseHandler(command));
398                    nacked = true;
399                }
400            }
401        }
402
403        if (!nacked) {
404            throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
405        }
406    }
407
408    protected void onStompAck(StompFrame command) throws ProtocolException {
409        checkConnected();
410
411        Map<String, String> headers = command.getHeaders();
412        String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
413        if (messageId == null && !(this.version.equals(Stomp.V1_2))) {
414            throw new ProtocolException("ACK received without a message-id to acknowledge!");
415        }
416
417        String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
418        if (subscriptionId == null && this.version.equals(Stomp.V1_1)) {
419            throw new ProtocolException("ACK received without a subscription id for acknowledge!");
420        }
421
422        String ackId = headers.get(Stomp.Headers.Ack.ACK_ID);
423        if (ackId == null && this.version.equals(Stomp.V1_2)) {
424            throw new ProtocolException("ACK received without a ack id for acknowledge!");
425        }
426
427        TransactionId activemqTx = null;
428        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
429        if (stompTx != null) {
430            activemqTx = transactions.get(stompTx);
431            if (activemqTx == null) {
432                throw new ProtocolException("Invalid transaction id: " + stompTx);
433            }
434        }
435
436        boolean acked = false;
437
438        if (ackId != null) {
439            AckEntry pendingAck = this.pedingAcks.remove(ackId);
440            if (pendingAck != null) {
441                messageId = pendingAck.getMessageId();
442                MessageAck ack = pendingAck.onMessageAck(activemqTx);
443                if (ack != null) {
444                    sendToActiveMQ(ack, createResponseHandler(command));
445                    acked = true;
446                }
447            }
448
449        } else if (subscriptionId != null) {
450            StompSubscription sub = this.subscriptions.get(subscriptionId);
451            if (sub != null) {
452                MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
453                if (ack != null) {
454                    sendToActiveMQ(ack, createResponseHandler(command));
455                    acked = true;
456                }
457            }
458        } else {
459            // STOMP v1.0: acking with just a message id is very bogus since the same message id
460            // could have been sent to 2 different subscriptions on the same Stomp connection.
461            // For example, when 2 subs are created on the same topic.
462            for (StompSubscription sub : subscriptionsByConsumerId.values()) {
463                MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
464                if (ack != null) {
465                    sendToActiveMQ(ack, createResponseHandler(command));
466                    acked = true;
467                    break;
468                }
469            }
470        }
471
472        if (!acked) {
473            throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
474        }
475    }
476
477    protected void onStompBegin(StompFrame command) throws ProtocolException {
478        checkConnected();
479
480        Map<String, String> headers = command.getHeaders();
481
482        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
483
484        if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
485            throw new ProtocolException("Must specify the transaction you are beginning");
486        }
487
488        if (transactions.get(stompTx) != null) {
489            throw new ProtocolException("The transaction was already started: " + stompTx);
490        }
491
492        LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
493        transactions.put(stompTx, activemqTx);
494
495        TransactionInfo tx = new TransactionInfo();
496        tx.setConnectionId(connectionId);
497        tx.setTransactionId(activemqTx);
498        tx.setType(TransactionInfo.BEGIN);
499
500        sendToActiveMQ(tx, createResponseHandler(command));
501    }
502
503    protected void onStompCommit(StompFrame command) throws ProtocolException {
504        checkConnected();
505
506        Map<String, String> headers = command.getHeaders();
507
508        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
509        if (stompTx == null) {
510            throw new ProtocolException("Must specify the transaction you are committing");
511        }
512
513        TransactionId activemqTx = transactions.remove(stompTx);
514        if (activemqTx == null) {
515            throw new ProtocolException("Invalid transaction id: " + stompTx);
516        }
517
518        for (StompSubscription sub : subscriptionsByConsumerId.values()) {
519            sub.onStompCommit(activemqTx);
520        }
521
522        pedingAcks.clear();
523
524        TransactionInfo tx = new TransactionInfo();
525        tx.setConnectionId(connectionId);
526        tx.setTransactionId(activemqTx);
527        tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
528
529        sendToActiveMQ(tx, createResponseHandler(command));
530    }
531
532    protected void onStompAbort(StompFrame command) throws ProtocolException {
533        checkConnected();
534        Map<String, String> headers = command.getHeaders();
535
536        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
537        if (stompTx == null) {
538            throw new ProtocolException("Must specify the transaction you are committing");
539        }
540
541        TransactionId activemqTx = transactions.remove(stompTx);
542        if (activemqTx == null) {
543            throw new ProtocolException("Invalid transaction id: " + stompTx);
544        }
545        for (StompSubscription sub : subscriptionsByConsumerId.values()) {
546            try {
547                sub.onStompAbort(activemqTx);
548            } catch (Exception e) {
549                throw new ProtocolException("Transaction abort failed", false, e);
550            }
551        }
552
553        pedingAcks.clear();
554
555        TransactionInfo tx = new TransactionInfo();
556        tx.setConnectionId(connectionId);
557        tx.setTransactionId(activemqTx);
558        tx.setType(TransactionInfo.ROLLBACK);
559
560        sendToActiveMQ(tx, createResponseHandler(command));
561    }
562
563    protected void onStompSubscribe(StompFrame command) throws ProtocolException {
564        checkConnected();
565        FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
566        Map<String, String> headers = command.getHeaders();
567
568        String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
569        String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
570
571        if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) {
572            throw new ProtocolException("SUBSCRIBE received without a subscription id!");
573        }
574
575        final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
576
577        if (actualDest == null) {
578            throw new ProtocolException("Invalid 'null' Destination.");
579        }
580
581        final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
582        ConsumerInfo consumerInfo = new ConsumerInfo(id);
583        consumerInfo.setPrefetchSize(actualDest.isQueue() ?
584                ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH :
585                headers.containsKey("activemq.subscriptionName") ?
586                        ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH : ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH);
587        consumerInfo.setDispatchAsync(true);
588
589        String browser = headers.get(Stomp.Headers.Subscribe.BROWSER);
590        if (browser != null && browser.equals(Stomp.TRUE)) {
591
592            if (this.version.equals(Stomp.V1_0)) {
593                throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1+ clients!");
594            }
595
596            consumerInfo.setBrowser(true);
597            consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH);
598        }
599
600        String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
601        if (selector != null) {
602            consumerInfo.setSelector("convert_string_expressions:" + selector);
603        }
604
605        IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
606
607        if (actualDest.isQueue() && consumerInfo.getSubscriptionName() != null) {
608            throw new ProtocolException("Invalid Subscription: cannot durably subscribe to a Queue destination!");
609        }
610
611        consumerInfo.setDestination(actualDest);
612
613        StompSubscription stompSubscription;
614        if (!consumerInfo.isBrowser()) {
615            stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
616        } else {
617            stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
618        }
619        stompSubscription.setDestination(actualDest);
620
621        String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
622        if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
623            stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
624        } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
625            stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
626        } else {
627            stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
628        }
629
630        subscriptionsByConsumerId.put(id, stompSubscription);
631        // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set.
632        if (subscriptionId != null) {
633            subscriptions.put(subscriptionId, stompSubscription);
634        }
635
636        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
637        if (receiptId != null && consumerInfo.getPrefetchSize() > 0) {
638
639            final StompFrame cmd = command;
640            final int prefetch = consumerInfo.getPrefetchSize();
641
642            // Since dispatch could beat the receipt we set prefetch to zero to start and then
643            // once we've sent our Receipt we are safe to turn on dispatch if the response isn't
644            // an error message.
645            consumerInfo.setPrefetchSize(0);
646
647            final ResponseHandler handler = new ResponseHandler() {
648                @Override
649                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
650                    if (response.isException()) {
651                        // Generally a command can fail.. but that does not invalidate the connection.
652                        // We report back the failure but we don't close the connection.
653                        Throwable exception = ((ExceptionResponse)response).getException();
654                        handleException(exception, cmd);
655                    } else {
656                        StompFrame sc = new StompFrame();
657                        sc.setAction(Stomp.Responses.RECEIPT);
658                        sc.setHeaders(new HashMap<String, String>(1));
659                        sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
660                        stompTransport.sendToStomp(sc);
661
662                        ConsumerControl control = new ConsumerControl();
663                        control.setPrefetch(prefetch);
664                        control.setDestination(actualDest);
665                        control.setConsumerId(id);
666
667                        sendToActiveMQ(control, null);
668                    }
669                }
670            };
671
672            sendToActiveMQ(consumerInfo, handler);
673        } else {
674            sendToActiveMQ(consumerInfo, createResponseHandler(command));
675        }
676    }
677
678    protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
679        checkConnected();
680        Map<String, String> headers = command.getHeaders();
681
682        ActiveMQDestination destination = null;
683        Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
684        if (o != null) {
685            destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true);
686        }
687
688        String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
689        if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) {
690            throw new ProtocolException("UNSUBSCRIBE received without a subscription id!");
691        }
692
693        if (subscriptionId == null && destination == null) {
694            throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
695        }
696
697        // check if it is a durable subscription
698        String durable = command.getHeaders().get("activemq.subscriptionName");
699        String clientId = durable;
700        if (!this.version.equals(Stomp.V1_0)) {
701            clientId = connectionInfo.getClientId();
702        }
703
704        if (durable != null) {
705            RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
706            info.setClientId(clientId);
707            info.setSubscriptionName(durable);
708            info.setConnectionId(connectionId);
709            sendToActiveMQ(info, createResponseHandler(command));
710            return;
711        }
712
713        if (subscriptionId != null) {
714            StompSubscription sub = this.subscriptions.remove(subscriptionId);
715            if (sub != null) {
716                sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
717                return;
718            }
719        } else {
720            // Unsubscribing using a destination is a bit weird if multiple subscriptions
721            // are created with the same destination.
722            for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
723                StompSubscription sub = iter.next();
724                if (destination != null && destination.equals(sub.getDestination())) {
725                    sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
726                    iter.remove();
727                    return;
728                }
729            }
730        }
731
732        throw new ProtocolException("No subscription matched.");
733    }
734
735    ConnectionInfo connectionInfo = new ConnectionInfo();
736
737    protected void onStompConnect(final StompFrame command) throws ProtocolException {
738
739        if (connected.get()) {
740            throw new ProtocolException("Already connected.");
741        }
742
743        final Map<String, String> headers = command.getHeaders();
744
745        // allow anyone to login for now
746        String login = headers.get(Stomp.Headers.Connect.LOGIN);
747        String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
748        String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
749        String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
750
751        if (heartBeat == null) {
752            heartBeat = defaultHeartBeat;
753        }
754
755        this.version = StompCodec.detectVersion(headers);
756
757        configureInactivityMonitor(heartBeat.trim());
758
759        IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
760        connectionInfo.setConnectionId(connectionId);
761        if (clientId != null) {
762            connectionInfo.setClientId(clientId);
763        } else {
764            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
765        }
766
767        connectionInfo.setResponseRequired(true);
768        connectionInfo.setUserName(login);
769        connectionInfo.setPassword(passcode);
770        connectionInfo.setTransportContext(command.getTransportContext());
771
772        sendToActiveMQ(connectionInfo, new ResponseHandler() {
773            @Override
774            public void onResponse(ProtocolConverter converter, Response response) throws IOException {
775
776                if (response.isException()) {
777                    // If the connection attempt fails we close the socket.
778                    Throwable exception = ((ExceptionResponse)response).getException();
779                    handleException(exception, command);
780                    getStompTransport().onException(IOExceptionSupport.create(exception));
781                    return;
782                }
783
784                final SessionInfo sessionInfo = new SessionInfo(sessionId);
785                sendToActiveMQ(sessionInfo, null);
786
787                final ProducerInfo producerInfo = new ProducerInfo(producerId);
788                sendToActiveMQ(producerInfo, new ResponseHandler() {
789                    @Override
790                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
791
792                        if (response.isException()) {
793                            // If the connection attempt fails we close the socket.
794                            Throwable exception = ((ExceptionResponse)response).getException();
795                            handleException(exception, command);
796                            getStompTransport().onException(IOExceptionSupport.create(exception));
797                        }
798
799                        connected.set(true);
800                        HashMap<String, String> responseHeaders = new HashMap<String, String>();
801
802                        responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
803                        String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
804                        if (requestId == null) {
805                            // TODO legacy
806                            requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
807                        }
808                        if (requestId != null) {
809                            // TODO legacy
810                            responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
811                            responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
812                        }
813
814                        responseHeaders.put(Stomp.Headers.Connected.VERSION, version);
815                        responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT,
816                                            String.format("%d,%d", hbWriteInterval, hbReadInterval));
817                        responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION);
818
819                        StompFrame sc = new StompFrame();
820                        sc.setAction(Stomp.Responses.CONNECTED);
821                        sc.setHeaders(responseHeaders);
822                        sendToStomp(sc);
823
824                        StompWireFormat format = stompTransport.getWireFormat();
825                        if (format != null) {
826                            format.setStompVersion(version);
827                        }
828                    }
829                });
830            }
831        });
832    }
833
834    protected void onStompDisconnect(StompFrame command) throws ProtocolException {
835        if (connected.get()) {
836            sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
837            sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
838            connected.set(false);
839        }
840    }
841
842    protected void checkConnected() throws ProtocolException {
843        if (!connected.get()) {
844            throw new ProtocolException("Not connected.");
845        }
846    }
847
848    /**
849     * Dispatch a ActiveMQ command
850     *
851     * @param command
852     * @throws IOException
853     */
854    public void onActiveMQCommand(Command command) throws IOException, JMSException {
855        if (command.isResponse()) {
856            Response response = (Response)command;
857            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
858            if (rh != null) {
859                rh.onResponse(this, response);
860            } else {
861                // Pass down any unexpected errors. Should this close the connection?
862                if (response.isException()) {
863                    Throwable exception = ((ExceptionResponse)response).getException();
864                    handleException(exception, null);
865                }
866            }
867        } else if (command.isMessageDispatch()) {
868            MessageDispatch md = (MessageDispatch)command;
869            StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
870            if (sub != null) {
871                String ackId = null;
872                if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO && md.getMessage() != null) {
873                    AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub);
874                    ackId = this.ACK_ID_GENERATOR.generateId();
875                    this.pedingAcks.put(ackId, pendingAck);
876                }
877                try {
878                    sub.onMessageDispatch(md, ackId);
879                } catch (Exception ex) {
880                    if (ackId != null) {
881                        this.pedingAcks.remove(ackId);
882                    }
883                }
884            }
885        } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
886            stompTransport.sendToStomp(ping);
887        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
888            // Pass down any unexpected async errors. Should this close the connection?
889            Throwable exception = ((ConnectionError)command).getException();
890            handleException(exception, null);
891        }
892    }
893
894    public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
895        ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
896        return msg;
897    }
898
899    public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
900        if (ignoreTransformation == true) {
901            return frameTranslator.convertMessage(this, message);
902        } else {
903            FrameTranslator translator = findTranslator(
904                message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination(), message.isAdvisory());
905            return translator.convertMessage(this, message);
906        }
907    }
908
909    public StompTransport getStompTransport() {
910        return stompTransport;
911    }
912
913    public ActiveMQDestination createTempDestination(String name, boolean topic) {
914        ActiveMQDestination rc = tempDestinations.get(name);
915        if( rc == null ) {
916            if (topic) {
917                rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
918            } else {
919                rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
920            }
921            sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
922            tempDestinations.put(name, rc);
923            tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
924        }
925        return rc;
926    }
927
928    public String getCreatedTempDestinationName(ActiveMQDestination destination) {
929        return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
930    }
931
932    public String getDefaultHeartBeat() {
933        return defaultHeartBeat;
934    }
935
936    public void setDefaultHeartBeat(String defaultHeartBeat) {
937        this.defaultHeartBeat = defaultHeartBeat;
938    }
939
940    /**
941     * @return the hbGracePeriodMultiplier
942     */
943    public float getHbGracePeriodMultiplier() {
944        return hbGracePeriodMultiplier;
945    }
946
947    /**
948     * @param hbGracePeriodMultiplier the hbGracePeriodMultiplier to set
949     */
950    public void setHbGracePeriodMultiplier(float hbGracePeriodMultiplier) {
951        this.hbGracePeriodMultiplier = hbGracePeriodMultiplier;
952    }
953
954    protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException {
955
956        String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
957
958        if (keepAliveOpts == null || keepAliveOpts.length != 2) {
959            throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
960        } else {
961
962            try {
963                hbReadInterval = (Long.parseLong(keepAliveOpts[0]));
964                hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
965            } catch(NumberFormatException e) {
966                throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
967            }
968
969            try {
970                StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
971                monitor.setReadCheckTime((long) (hbReadInterval * hbGracePeriodMultiplier));
972                monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
973                monitor.setWriteCheckTime(hbWriteInterval);
974                monitor.startMonitoring();
975            } catch(Exception ex) {
976                hbReadInterval = 0;
977                hbWriteInterval = 0;
978            }
979
980            if (LOG.isDebugEnabled()) {
981                LOG.debug("Stomp Connect heartbeat conf RW[{},{}]", hbReadInterval, hbWriteInterval);
982            }
983        }
984    }
985
986    protected void sendReceipt(StompFrame command) {
987        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
988        if (receiptId != null) {
989            StompFrame sc = new StompFrame();
990            sc.setAction(Stomp.Responses.RECEIPT);
991            sc.setHeaders(new HashMap<String, String>(1));
992            sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
993            try {
994                sendToStomp(sc);
995            } catch (IOException e) {
996                LOG.warn("Could not send a receipt for {}", command, e);
997            }
998        }
999    }
1000
1001    /**
1002     * Retrieve the STOMP action value from a frame if the value is valid, otherwise
1003     * return an unknown string to allow for safe log output.
1004     *
1005     * @param command
1006     *      The STOMP command to fetch an action from.
1007     *
1008     * @return the command action or a safe string to use in logging.
1009     */
1010    protected Object safeGetAction(StompFrame command) {
1011        String result = "<Unknown>";
1012        if (command != null && command.getAction() != null) {
1013            String action = command.getAction().trim();
1014
1015            if (action != null) {
1016                switch (action) {
1017                    case Stomp.Commands.SEND:
1018                    case Stomp.Commands.ACK:
1019                    case Stomp.Commands.NACK:
1020                    case Stomp.Commands.BEGIN:
1021                    case Stomp.Commands.COMMIT:
1022                    case Stomp.Commands.ABORT:
1023                    case Stomp.Commands.SUBSCRIBE:
1024                    case Stomp.Commands.UNSUBSCRIBE:
1025                    case Stomp.Commands.CONNECT:
1026                    case Stomp.Commands.STOMP:
1027                    case Stomp.Commands.DISCONNECT:
1028                        result = action;
1029                        break;
1030                    default:
1031                        break;
1032                }
1033            }
1034        }
1035
1036        return result;
1037    }
1038}