001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.ws.jetty9;
018
019import java.io.IOException;
020import java.nio.ByteBuffer;
021
022import org.apache.activemq.transport.ws.AbstractMQTTSocket;
023import org.apache.activemq.util.ByteSequence;
024import org.apache.activemq.util.IOExceptionSupport;
025import org.eclipse.jetty.websocket.api.Session;
026import org.eclipse.jetty.websocket.api.WebSocketListener;
027import org.fusesource.mqtt.codec.DISCONNECT;
028import org.fusesource.mqtt.codec.MQTTFrame;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener {
033
034    private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
035
036    private Session session;
037
038    public MQTTSocket(String remoteAddress) {
039        super(remoteAddress);
040    }
041
042    @Override
043    public void sendToMQTT(MQTTFrame command) throws IOException {
044        ByteSequence bytes = wireFormat.marshal(command);
045        session.getRemote().sendBytes(ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength()));
046    }
047
048    @Override
049    public void handleStopped() throws IOException {
050        if (session != null && session.isOpen()) {
051            session.close();
052        }
053    }
054
055    //----- WebSocket.OnTextMessage callback handlers ------------------------//
056
057    @Override
058    public void onWebSocketBinary(byte[] bytes, int offset, int length) {
059        if (!transportStartedAtLeastOnce()) {
060            LOG.debug("Waiting for MQTTSocket to be properly started...");
061            try {
062                socketTransportStarted.await();
063            } catch (InterruptedException e) {
064                LOG.warn("While waiting for MQTTSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
065            }
066        }
067
068        receiveCounter += length;
069
070        try {
071            MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
072            getProtocolConverter().onMQTTCommand(frame);
073        } catch (Exception e) {
074            onException(IOExceptionSupport.create(e));
075        }
076    }
077
078    @Override
079    public void onWebSocketClose(int arg0, String arg1) {
080        try {
081            getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
082        } catch (Exception e) {
083            LOG.warn("Failed to close WebSocket", e);
084        }
085    }
086
087    @Override
088    public void onWebSocketConnect(Session session) {
089        this.session = session;
090    }
091
092    @Override
093    public void onWebSocketError(Throwable arg0) {
094
095    }
096
097    @Override
098    public void onWebSocketText(String arg0) {
099    }
100}