001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.ws;
018
019import java.io.IOException;
020import java.security.cert.X509Certificate;
021import java.util.concurrent.CountDownLatch;
022import java.util.concurrent.locks.ReentrantLock;
023
024import org.apache.activemq.command.Command;
025import org.apache.activemq.command.KeepAliveInfo;
026import org.apache.activemq.transport.TransportSupport;
027import org.apache.activemq.transport.stomp.ProtocolConverter;
028import org.apache.activemq.transport.stomp.StompFrame;
029import org.apache.activemq.transport.stomp.StompInactivityMonitor;
030import org.apache.activemq.transport.stomp.StompTransport;
031import org.apache.activemq.transport.stomp.StompWireFormat;
032import org.apache.activemq.util.ByteSequence;
033import org.apache.activemq.util.IOExceptionSupport;
034import org.apache.activemq.util.ServiceStopper;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Base implementation of a STOMP based WebSocket handler.
040 */
041public abstract class AbstractStompSocket extends TransportSupport implements StompTransport {
042
043    private static final Logger LOG = LoggerFactory.getLogger(AbstractStompSocket.class);
044
045    protected ReentrantLock protocolLock = new ReentrantLock();
046    protected ProtocolConverter protocolConverter = new ProtocolConverter(this, null);
047    protected StompWireFormat wireFormat = new StompWireFormat();
048    protected final CountDownLatch socketTransportStarted = new CountDownLatch(1);
049    protected final StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this, wireFormat);
050    protected volatile int receiveCounter;
051    protected final String remoteAddress;
052    protected X509Certificate[] certificates;
053
054    public AbstractStompSocket(String remoteAddress) {
055        super();
056        this.remoteAddress = remoteAddress;
057    }
058
059    @Override
060    public void oneway(Object command) throws IOException {
061        protocolLock.lock();
062        try {
063            protocolConverter.onActiveMQCommand((Command)command);
064        } catch (Exception e) {
065            onException(IOExceptionSupport.create(e));
066        } finally {
067            protocolLock.unlock();
068        }
069    }
070
071    @Override
072    public void sendToActiveMQ(Command command) {
073        protocolLock.lock();
074        try {
075            doConsume(command);
076        } finally {
077            protocolLock.unlock();
078        }
079    }
080
081    @Override
082    protected void doStop(ServiceStopper stopper) throws Exception {
083        stompInactivityMonitor.stop();
084        handleStopped();
085    }
086
087    @Override
088    protected void doStart() throws Exception {
089        socketTransportStarted.countDown();
090        stompInactivityMonitor.setTransportListener(getTransportListener());
091        stompInactivityMonitor.startConnectCheckTask();
092    }
093
094    //----- Abstract methods for subclasses to implement ---------------------//
095
096    @Override
097    public abstract void sendToStomp(StompFrame command) throws IOException;
098
099    /**
100     * Called when the transport is stopping to allow the dervied classes
101     * a chance to close WebSocket resources.
102     *
103     * @throws IOException if an error occurs during the stop.
104     */
105    public abstract void handleStopped() throws IOException;
106
107    //----- Accessor methods -------------------------------------------------//
108
109    @Override
110    public StompInactivityMonitor getInactivityMonitor() {
111        return stompInactivityMonitor;
112    }
113
114    @Override
115    public StompWireFormat getWireFormat() {
116        return wireFormat;
117    }
118
119    @Override
120    public String getRemoteAddress() {
121        return remoteAddress;
122    }
123
124    @Override
125    public int getReceiveCounter() {
126        return receiveCounter;
127    }
128
129    //----- Internal implementation ------------------------------------------//
130
131    protected void processStompFrame(String data) {
132        if (!transportStartedAtLeastOnce()) {
133            LOG.debug("Waiting for StompSocket to be properly started...");
134            try {
135                socketTransportStarted.await();
136            } catch (InterruptedException e) {
137                LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
138            }
139        }
140
141        protocolLock.lock();
142        try {
143            if (data != null) {
144                receiveCounter += data.length();
145
146                if (data.equals("\n")) {
147                    stompInactivityMonitor.onCommand(new KeepAliveInfo());
148                } else {
149                    StompFrame frame = (StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8")));
150                    frame.setTransportContext(getCertificates());
151                    protocolConverter.onStompCommand(frame);
152                }
153            }
154        } catch (Exception e) {
155            onException(IOExceptionSupport.create(e));
156        } finally {
157            protocolLock.unlock();
158        }
159    }
160
161    private boolean transportStartedAtLeastOnce() {
162        return socketTransportStarted.getCount() == 0;
163    }
164
165    public X509Certificate[] getCertificates() {
166        return certificates;
167    }
168
169    public void setCertificates(X509Certificate[] certificates) {
170        this.certificates = certificates;
171    }
172}