001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.ws; 018 019import java.io.IOException; 020import java.util.concurrent.BlockingQueue; 021import java.util.concurrent.CountDownLatch; 022import java.util.concurrent.LinkedBlockingDeque; 023import java.util.concurrent.TimeUnit; 024 025import org.apache.activemq.transport.stomp.StompFrame; 026import org.eclipse.jetty.websocket.api.Session; 027import org.eclipse.jetty.websocket.api.WebSocketAdapter; 028import org.eclipse.jetty.websocket.api.WebSocketListener; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * STOMP over WS based Connection class 034 */ 035public class StompWSConnection extends WebSocketAdapter implements WebSocketListener { 036 037 private static final Logger LOG = LoggerFactory.getLogger(StompWSConnection.class); 038 039 private Session connection; 040 private final CountDownLatch connectLatch = new CountDownLatch(1); 041 042 private final BlockingQueue<String> prefetch = new LinkedBlockingDeque<String>(); 043 044 private int closeCode = -1; 045 private String closeMessage; 046 047 @Override 048 public boolean isConnected() { 049 return connection != null ? connection.isOpen() : false; 050 } 051 052 public void close() { 053 if (connection != null) { 054 connection.close(); 055 } 056 } 057 058 protected Session getConnection() { 059 return connection; 060 } 061 062 //---- Send methods ------------------------------------------------------// 063 064 public synchronized void sendRawFrame(String rawFrame) throws Exception { 065 checkConnected(); 066 connection.getRemote().sendString(rawFrame); 067 } 068 069 public synchronized void sendFrame(StompFrame frame) throws Exception { 070 checkConnected(); 071 connection.getRemote().sendString(frame.format()); 072 } 073 074 public synchronized void keepAlive() throws Exception { 075 checkConnected(); 076 connection.getRemote().sendString("\n"); 077 } 078 079 //----- Receive methods --------------------------------------------------// 080 081 public String receive() throws Exception { 082 checkConnected(); 083 return prefetch.take(); 084 } 085 086 public String receive(long timeout, TimeUnit unit) throws Exception { 087 checkConnected(); 088 return prefetch.poll(timeout, unit); 089 } 090 091 public String receiveNoWait() throws Exception { 092 checkConnected(); 093 return prefetch.poll(); 094 } 095 096 //---- Blocking state change calls ---------------------------------------// 097 098 public void awaitConnection() throws InterruptedException { 099 connectLatch.await(); 100 } 101 102 public boolean awaitConnection(long time, TimeUnit unit) throws InterruptedException { 103 return connectLatch.await(time, unit); 104 } 105 106 //----- Property Accessors -----------------------------------------------// 107 108 public int getCloseCode() { 109 return closeCode; 110 } 111 112 public String getCloseMessage() { 113 return closeMessage; 114 } 115 116 //----- WebSocket callback handlers --------------------------------------// 117 118 @Override 119 public void onWebSocketText(String data) { 120 if (data == null) { 121 return; 122 } 123 124 if (data.equals("\n")) { 125 LOG.debug("New incoming heartbeat read"); 126 } else { 127 LOG.trace("New incoming STOMP Frame read: \n{}", data); 128 prefetch.add(data); 129 } 130 } 131 132 133 /* (non-Javadoc) 134 * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketClose(int, java.lang.String) 135 */ 136 @Override 137 public void onWebSocketClose(int statusCode, String reason) { 138 LOG.trace("STOMP WS Connection closed, code:{} message:{}", statusCode, reason); 139 140 this.connection = null; 141 this.closeCode = statusCode; 142 this.closeMessage = reason; 143 144 } 145 146 /* (non-Javadoc) 147 * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketConnect(org.eclipse.jetty.websocket.api.Session) 148 */ 149 @Override 150 public void onWebSocketConnect( 151 org.eclipse.jetty.websocket.api.Session session) { 152 this.connection = session; 153 this.connectLatch.countDown(); 154 } 155 156 //----- Internal implementation ------------------------------------------// 157 158 private void checkConnected() throws IOException { 159 if (!isConnected()) { 160 throw new IOException("STOMP WS Connection is closed."); 161 } 162 } 163}