001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.ws;
018
019import java.io.IOException;
020import java.util.concurrent.BlockingQueue;
021import java.util.concurrent.CountDownLatch;
022import java.util.concurrent.LinkedBlockingDeque;
023import java.util.concurrent.TimeUnit;
024
025import org.apache.activemq.transport.stomp.StompFrame;
026import org.eclipse.jetty.websocket.api.Session;
027import org.eclipse.jetty.websocket.api.WebSocketAdapter;
028import org.eclipse.jetty.websocket.api.WebSocketListener;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * STOMP over WS based Connection class
034 */
035public class StompWSConnection extends WebSocketAdapter implements WebSocketListener {
036
037    private static final Logger LOG = LoggerFactory.getLogger(StompWSConnection.class);
038
039    private Session connection;
040    private final CountDownLatch connectLatch = new CountDownLatch(1);
041
042    private final BlockingQueue<String> prefetch = new LinkedBlockingDeque<String>();
043
044    private int closeCode = -1;
045    private String closeMessage;
046
047    @Override
048    public boolean isConnected() {
049        return connection != null ? connection.isOpen() : false;
050    }
051
052    public void close() {
053        if (connection != null) {
054            connection.close();
055        }
056    }
057
058    protected Session getConnection() {
059        return connection;
060    }
061
062    //---- Send methods ------------------------------------------------------//
063
064    public synchronized void sendRawFrame(String rawFrame) throws Exception {
065        checkConnected();
066        connection.getRemote().sendString(rawFrame);
067    }
068
069    public synchronized void sendFrame(StompFrame frame) throws Exception {
070        checkConnected();
071        connection.getRemote().sendString(frame.format());
072    }
073
074    public synchronized void keepAlive() throws Exception {
075        checkConnected();
076        connection.getRemote().sendString("\n");
077    }
078
079    //----- Receive methods --------------------------------------------------//
080
081    public String receive() throws Exception {
082        checkConnected();
083        return prefetch.take();
084    }
085
086    public String receive(long timeout, TimeUnit unit) throws Exception {
087        checkConnected();
088        return prefetch.poll(timeout, unit);
089    }
090
091    public String receiveNoWait() throws Exception {
092        checkConnected();
093        return prefetch.poll();
094    }
095
096    //---- Blocking state change calls ---------------------------------------//
097
098    public void awaitConnection() throws InterruptedException {
099        connectLatch.await();
100    }
101
102    public boolean awaitConnection(long time, TimeUnit unit) throws InterruptedException {
103        return connectLatch.await(time, unit);
104    }
105
106    //----- Property Accessors -----------------------------------------------//
107
108    public int getCloseCode() {
109        return closeCode;
110    }
111
112    public String getCloseMessage() {
113        return closeMessage;
114    }
115
116    //----- WebSocket callback handlers --------------------------------------//
117
118    @Override
119    public void onWebSocketText(String data) {
120        if (data == null) {
121            return;
122        }
123
124        if (data.equals("\n")) {
125            LOG.debug("New incoming heartbeat read");
126        } else {
127            LOG.trace("New incoming STOMP Frame read: \n{}", data);
128            prefetch.add(data);
129        }
130    }
131
132
133    /* (non-Javadoc)
134     * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketClose(int, java.lang.String)
135     */
136    @Override
137    public void onWebSocketClose(int statusCode, String reason) {
138        LOG.trace("STOMP WS Connection closed, code:{} message:{}", statusCode, reason);
139
140        this.connection = null;
141        this.closeCode = statusCode;
142        this.closeMessage = reason;
143
144    }
145
146    /* (non-Javadoc)
147     * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketConnect(org.eclipse.jetty.websocket.api.Session)
148     */
149    @Override
150    public void onWebSocketConnect(
151            org.eclipse.jetty.websocket.api.Session session) {
152        this.connection = session;
153        this.connectLatch.countDown();
154    }
155
156    //----- Internal implementation ------------------------------------------//
157
158    private void checkConnected() throws IOException {
159        if (!isConnected()) {
160            throw new IOException("STOMP WS Connection is closed.");
161        }
162    }
163}