001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.ws;
018
019import java.io.IOException;
020import java.security.cert.X509Certificate;
021import java.util.Map;
022import java.util.concurrent.CountDownLatch;
023
024import org.apache.activemq.broker.BrokerService;
025import org.apache.activemq.broker.BrokerServiceAware;
026import org.apache.activemq.command.Command;
027import org.apache.activemq.jms.pool.IntrospectionSupport;
028import org.apache.activemq.transport.TransportSupport;
029import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor;
030import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
031import org.apache.activemq.transport.mqtt.MQTTTransport;
032import org.apache.activemq.transport.mqtt.MQTTWireFormat;
033import org.apache.activemq.util.IOExceptionSupport;
034import org.apache.activemq.util.ServiceStopper;
035import org.fusesource.mqtt.codec.MQTTFrame;
036
037public abstract class AbstractMQTTSocket extends TransportSupport implements MQTTTransport, BrokerServiceAware {
038
039    protected volatile MQTTProtocolConverter protocolConverter = null;
040    protected MQTTWireFormat wireFormat = new MQTTWireFormat();
041    protected final MQTTInactivityMonitor mqttInactivityMonitor = new MQTTInactivityMonitor(this, wireFormat);
042    protected final CountDownLatch socketTransportStarted = new CountDownLatch(1);
043    protected BrokerService brokerService;
044    protected volatile int receiveCounter;
045    protected final String remoteAddress;
046    protected X509Certificate[] peerCertificates;
047    private Map<String, Object> transportOptions;
048
049    public AbstractMQTTSocket(String remoteAddress) {
050        super();
051        this.remoteAddress = remoteAddress;
052    }
053
054    @Override
055    public void oneway(Object command) throws IOException {
056        try {
057            getProtocolConverter().onActiveMQCommand((Command)command);
058        } catch (Exception e) {
059            onException(IOExceptionSupport.create(e));
060        }
061    }
062
063    @Override
064    public void sendToActiveMQ(Command command) {
065        doConsume(command);
066    }
067
068    @Override
069    protected void doStop(ServiceStopper stopper) throws Exception {
070        mqttInactivityMonitor.stop();
071        handleStopped();
072    }
073
074    @Override
075    protected void doStart() throws Exception {
076        socketTransportStarted.countDown();
077        mqttInactivityMonitor.setTransportListener(getTransportListener());
078        mqttInactivityMonitor.startConnectChecker(wireFormat.getConnectAttemptTimeout());
079    }
080
081    //----- Abstract methods for subclasses to implement ---------------------//
082
083    @Override
084    public abstract void sendToMQTT(MQTTFrame command) throws IOException;
085
086    /**
087     * Called when the transport is stopping to allow the dervied classes
088     * a chance to close WebSocket resources.
089     *
090     * @throws IOException if an error occurs during the stop.
091     */
092    public abstract void handleStopped() throws IOException;
093
094    //----- Accessor methods -------------------------------------------------//
095
096    @Override
097    public MQTTInactivityMonitor getInactivityMonitor() {
098        return mqttInactivityMonitor;
099    }
100
101    @Override
102    public MQTTWireFormat getWireFormat() {
103        return wireFormat;
104    }
105
106    @Override
107    public String getRemoteAddress() {
108        return remoteAddress;
109    }
110
111    @Override
112    public int getReceiveCounter() {
113        return receiveCounter;
114    }
115
116    @Override
117    public X509Certificate[] getPeerCertificates() {
118        return peerCertificates;
119    }
120
121    @Override
122    public void setPeerCertificates(X509Certificate[] certificates) {
123        this.peerCertificates = certificates;
124    }
125
126    @Override
127    public void setBrokerService(BrokerService brokerService) {
128        this.brokerService = brokerService;
129    }
130
131    //----- Internal support methods -----------------------------------------//
132
133    protected MQTTProtocolConverter getProtocolConverter() {
134        if (protocolConverter == null) {
135            synchronized(this) {
136                if (protocolConverter == null) {
137                    protocolConverter = new MQTTProtocolConverter(this, brokerService);
138                    IntrospectionSupport.setProperties(protocolConverter, transportOptions);
139                }
140            }
141        }
142        return protocolConverter;
143    }
144
145    protected boolean transportStartedAtLeastOnce() {
146        return socketTransportStarted.getCount() == 0;
147    }
148
149    public void setTransportOptions(Map<String, Object> transportOptions) {
150        this.transportOptions = transportOptions;
151    }
152}