001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.ws; 018 019import java.io.IOException; 020import java.security.cert.X509Certificate; 021import java.util.concurrent.CountDownLatch; 022import java.util.concurrent.locks.ReentrantLock; 023 024import org.apache.activemq.command.Command; 025import org.apache.activemq.command.KeepAliveInfo; 026import org.apache.activemq.transport.TransportSupport; 027import org.apache.activemq.transport.stomp.ProtocolConverter; 028import org.apache.activemq.transport.stomp.StompFrame; 029import org.apache.activemq.transport.stomp.StompInactivityMonitor; 030import org.apache.activemq.transport.stomp.StompTransport; 031import org.apache.activemq.transport.stomp.StompWireFormat; 032import org.apache.activemq.util.ByteSequence; 033import org.apache.activemq.util.IOExceptionSupport; 034import org.apache.activemq.util.ServiceStopper; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Base implementation of a STOMP based WebSocket handler. 040 */ 041public abstract class AbstractStompSocket extends TransportSupport implements StompTransport { 042 043 private static final Logger LOG = LoggerFactory.getLogger(AbstractStompSocket.class); 044 045 protected ReentrantLock protocolLock = new ReentrantLock(); 046 protected ProtocolConverter protocolConverter = new ProtocolConverter(this, null); 047 protected StompWireFormat wireFormat = new StompWireFormat(); 048 protected final CountDownLatch socketTransportStarted = new CountDownLatch(1); 049 protected final StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this, wireFormat); 050 protected volatile int receiveCounter; 051 protected final String remoteAddress; 052 protected X509Certificate[] certificates; 053 054 public AbstractStompSocket(String remoteAddress) { 055 super(); 056 this.remoteAddress = remoteAddress; 057 } 058 059 @Override 060 public void oneway(Object command) throws IOException { 061 protocolLock.lock(); 062 try { 063 protocolConverter.onActiveMQCommand((Command)command); 064 } catch (Exception e) { 065 onException(IOExceptionSupport.create(e)); 066 } finally { 067 protocolLock.unlock(); 068 } 069 } 070 071 @Override 072 public void sendToActiveMQ(Command command) { 073 protocolLock.lock(); 074 try { 075 doConsume(command); 076 } finally { 077 protocolLock.unlock(); 078 } 079 } 080 081 @Override 082 protected void doStop(ServiceStopper stopper) throws Exception { 083 stompInactivityMonitor.stop(); 084 handleStopped(); 085 } 086 087 @Override 088 protected void doStart() throws Exception { 089 socketTransportStarted.countDown(); 090 stompInactivityMonitor.setTransportListener(getTransportListener()); 091 stompInactivityMonitor.startConnectCheckTask(); 092 } 093 094 //----- Abstract methods for subclasses to implement ---------------------// 095 096 @Override 097 public abstract void sendToStomp(StompFrame command) throws IOException; 098 099 /** 100 * Called when the transport is stopping to allow the dervied classes 101 * a chance to close WebSocket resources. 102 * 103 * @throws IOException if an error occurs during the stop. 104 */ 105 public abstract void handleStopped() throws IOException; 106 107 //----- Accessor methods -------------------------------------------------// 108 109 @Override 110 public StompInactivityMonitor getInactivityMonitor() { 111 return stompInactivityMonitor; 112 } 113 114 @Override 115 public StompWireFormat getWireFormat() { 116 return wireFormat; 117 } 118 119 @Override 120 public String getRemoteAddress() { 121 return remoteAddress; 122 } 123 124 @Override 125 public int getReceiveCounter() { 126 return receiveCounter; 127 } 128 129 //----- Internal implementation ------------------------------------------// 130 131 protected void processStompFrame(String data) { 132 if (!transportStartedAtLeastOnce()) { 133 LOG.debug("Waiting for StompSocket to be properly started..."); 134 try { 135 socketTransportStarted.await(); 136 } catch (InterruptedException e) { 137 LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions..."); 138 } 139 } 140 141 protocolLock.lock(); 142 try { 143 if (data != null) { 144 receiveCounter += data.length(); 145 146 if (data.equals("\n")) { 147 stompInactivityMonitor.onCommand(new KeepAliveInfo()); 148 } else { 149 StompFrame frame = (StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8"))); 150 frame.setTransportContext(getCertificates()); 151 protocolConverter.onStompCommand(frame); 152 } 153 } 154 } catch (Exception e) { 155 onException(IOExceptionSupport.create(e)); 156 } finally { 157 protocolLock.unlock(); 158 } 159 } 160 161 private boolean transportStartedAtLeastOnce() { 162 return socketTransportStarted.getCount() == 0; 163 } 164 165 public X509Certificate[] getCertificates() { 166 return certificates; 167 } 168 169 public void setCertificates(X509Certificate[] certificates) { 170 this.certificates = certificates; 171 } 172}