001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.transport.stomp; 019 020import java.io.ByteArrayOutputStream; 021import java.io.DataInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.net.Socket; 026import java.net.UnknownHostException; 027import java.util.HashMap; 028 029public class StompConnection { 030 031 public static final long RECEIVE_TIMEOUT = 10000; 032 033 private Socket stompSocket; 034 private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream(); 035 private String version = Stomp.DEFAULT_VERSION; 036 037 public void open(String host, int port) throws IOException, UnknownHostException { 038 open(new Socket(host, port)); 039 } 040 041 public void open(Socket socket) { 042 stompSocket = socket; 043 } 044 045 public void close() throws IOException { 046 if (stompSocket != null) { 047 stompSocket.close(); 048 stompSocket = null; 049 } 050 } 051 052 public void sendFrame(String data) throws Exception { 053 byte[] bytes = data.getBytes("UTF-8"); 054 OutputStream outputStream = stompSocket.getOutputStream(); 055 outputStream.write(bytes); 056 outputStream.flush(); 057 } 058 059 public void sendFrame(String frame, byte[] data) throws Exception { 060 byte[] bytes = frame.getBytes("UTF-8"); 061 OutputStream outputStream = stompSocket.getOutputStream(); 062 outputStream.write(bytes); 063 outputStream.write(data); 064 outputStream.flush(); 065 } 066 067 public StompFrame receive() throws Exception { 068 return receive(RECEIVE_TIMEOUT); 069 } 070 071 public StompFrame receive(long timeOut) throws Exception { 072 stompSocket.setSoTimeout((int)timeOut); 073 InputStream is = stompSocket.getInputStream(); 074 StompWireFormat wf = new StompWireFormat(); 075 wf.setStompVersion(version); 076 DataInputStream dis = new DataInputStream(is); 077 return (StompFrame)wf.unmarshal(dis); 078 } 079 080 public String receiveFrame() throws Exception { 081 return receiveFrame(RECEIVE_TIMEOUT); 082 } 083 084 public String receiveFrame(long timeOut) throws Exception { 085 stompSocket.setSoTimeout((int)timeOut); 086 InputStream is = stompSocket.getInputStream(); 087 int c = 0; 088 for (;;) { 089 c = is.read(); 090 if (c < 0) { 091 throw new IOException("socket closed."); 092 } else if (c == 0) { 093 c = is.read(); 094 if (c == '\n') { 095 // end of frame 096 return stringFromBuffer(inputBuffer); 097 } else { 098 inputBuffer.write(0); 099 inputBuffer.write(c); 100 } 101 } else { 102 inputBuffer.write(c); 103 } 104 } 105 } 106 107 private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception { 108 byte[] ba = inputBuffer.toByteArray(); 109 inputBuffer.reset(); 110 return new String(ba, "UTF-8"); 111 } 112 113 public Socket getStompSocket() { 114 return stompSocket; 115 } 116 117 public void setStompSocket(Socket stompSocket) { 118 this.stompSocket = stompSocket; 119 } 120 121 public void connect(String username, String password) throws Exception { 122 connect(username, password, null); 123 } 124 125 public void connect(String username, String password, String client) throws Exception { 126 HashMap<String, String> headers = new HashMap<String, String>(); 127 headers.put("login", username); 128 headers.put("passcode", password); 129 if (client != null) { 130 headers.put("client-id", client); 131 } 132 connect(headers); 133 } 134 135 public void connect(HashMap<String, String> headers) throws Exception { 136 StompFrame frame = new StompFrame("CONNECT", headers); 137 sendFrame(frame.format()); 138 139 StompFrame connect = receive(); 140 if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) { 141 throw new Exception ("Not connected: " + connect.getBody()); 142 } 143 } 144 145 public void disconnect() throws Exception { 146 disconnect(null); 147 } 148 149 public void disconnect(String receiptId) throws Exception { 150 StompFrame frame = new StompFrame("DISCONNECT"); 151 if (receiptId != null && !receiptId.isEmpty()) { 152 frame.getHeaders().put(Stomp.Headers.RECEIPT_REQUESTED, receiptId); 153 } 154 sendFrame(frame.format()); 155 } 156 157 public void send(String destination, String message) throws Exception { 158 send(destination, message, null, null); 159 } 160 161 public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception { 162 if (headers == null) { 163 headers = new HashMap<String, String>(); 164 } 165 headers.put("destination", destination); 166 if (transaction != null) { 167 headers.put("transaction", transaction); 168 } 169 StompFrame frame = new StompFrame("SEND", headers, message.getBytes()); 170 sendFrame(frame.format()); 171 } 172 173 public void subscribe(String destination) throws Exception { 174 subscribe(destination, null, null); 175 } 176 177 public void subscribe(String destination, String ack) throws Exception { 178 subscribe(destination, ack, new HashMap<String, String>()); 179 } 180 181 public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception { 182 if (headers == null) { 183 headers = new HashMap<String, String>(); 184 } 185 headers.put("destination", destination); 186 if (ack != null) { 187 headers.put("ack", ack); 188 } 189 StompFrame frame = new StompFrame("SUBSCRIBE", headers); 190 sendFrame(frame.format()); 191 } 192 193 public void unsubscribe(String destination) throws Exception { 194 unsubscribe(destination, null); 195 } 196 197 public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception { 198 if (headers == null) { 199 headers = new HashMap<String, String>(); 200 } 201 headers.put("destination", destination); 202 StompFrame frame = new StompFrame("UNSUBSCRIBE", headers); 203 sendFrame(frame.format()); 204 } 205 206 public void begin(String transaction) throws Exception { 207 HashMap<String, String> headers = new HashMap<String, String>(); 208 headers.put("transaction", transaction); 209 StompFrame frame = new StompFrame("BEGIN", headers); 210 sendFrame(frame.format()); 211 } 212 213 public void abort(String transaction) throws Exception { 214 HashMap<String, String> headers = new HashMap<String, String>(); 215 headers.put("transaction", transaction); 216 StompFrame frame = new StompFrame("ABORT", headers); 217 sendFrame(frame.format()); 218 } 219 220 public void commit(String transaction) throws Exception { 221 HashMap<String, String> headers = new HashMap<String, String>(); 222 headers.put("transaction", transaction); 223 StompFrame frame = new StompFrame("COMMIT", headers); 224 sendFrame(frame.format()); 225 } 226 227 public void ack(StompFrame frame) throws Exception { 228 ack(frame.getHeaders().get("message-id"), null); 229 } 230 231 public void ack(StompFrame frame, String transaction) throws Exception { 232 ack(frame.getHeaders().get("message-id"), transaction); 233 } 234 235 public void ack(String messageId) throws Exception { 236 ack(messageId, null); 237 } 238 239 public void ack(String messageId, String transaction) throws Exception { 240 HashMap<String, String> headers = new HashMap<String, String>(); 241 headers.put("message-id", messageId); 242 if (transaction != null) 243 headers.put("transaction", transaction); 244 StompFrame frame = new StompFrame("ACK", headers); 245 sendFrame(frame.format()); 246 } 247 248 public void keepAlive() throws Exception { 249 OutputStream outputStream = stompSocket.getOutputStream(); 250 outputStream.write('\n'); 251 outputStream.flush(); 252 } 253 254 protected String appendHeaders(HashMap<String, Object> headers) { 255 StringBuilder result = new StringBuilder(); 256 for (String key : headers.keySet()) { 257 result.append(key + ":" + headers.get(key) + "\n"); 258 } 259 result.append("\n"); 260 return result.toString(); 261 } 262 263 public String getVersion() { 264 return version; 265 } 266 267 public void setVersion(String version) { 268 this.version = version; 269 } 270}