001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import java.io.ByteArrayInputStream;
020import java.io.DataOutputStream;
021import java.io.EOFException;
022import java.io.IOException;
023import java.net.Socket;
024import java.net.URI;
025import java.net.UnknownHostException;
026import java.nio.ByteBuffer;
027import java.nio.channels.SelectionKey;
028import java.nio.channels.SocketChannel;
029
030import javax.net.SocketFactory;
031
032import org.apache.activemq.transport.Transport;
033import org.apache.activemq.transport.nio.NIOOutputStream;
034import org.apache.activemq.transport.nio.SelectorManager;
035import org.apache.activemq.transport.nio.SelectorSelection;
036import org.apache.activemq.transport.tcp.TcpTransport;
037import org.apache.activemq.util.IOExceptionSupport;
038import org.apache.activemq.util.ServiceStopper;
039import org.apache.activemq.wireformat.WireFormat;
040
041/**
042 * An implementation of the {@link Transport} interface for using Stomp over NIO
043 */
044public class StompNIOTransport extends TcpTransport {
045
046    private SocketChannel channel;
047    private SelectorSelection selection;
048
049    private ByteBuffer inputBuffer;
050    StompCodec codec;
051
052    public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
053        super(wireFormat, socketFactory, remoteLocation, localLocation);
054    }
055
056    public StompNIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
057        super(wireFormat, socket);
058    }
059
060    @Override
061    protected void initializeStreams() throws IOException {
062        channel = socket.getChannel();
063        channel.configureBlocking(false);
064
065        // listen for events telling us when the socket is readable.
066        selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
067            @Override
068            public void onSelect(SelectorSelection selection) {
069                serviceRead();
070            }
071
072            @Override
073            public void onError(SelectorSelection selection, Throwable error) {
074                if (error instanceof IOException) {
075                    onException((IOException)error);
076                } else {
077                    onException(IOExceptionSupport.create(error));
078                }
079            }
080        });
081
082        inputBuffer = ByteBuffer.allocate(8 * 1024);
083        NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
084        this.dataOut = new DataOutputStream(outPutStream);
085        this.buffOut = outPutStream;
086        codec = new StompCodec(this);
087    }
088
089    private void serviceRead() {
090        try {
091
092           while (true) {
093               // read channel
094               int readSize = channel.read(inputBuffer);
095               // channel is closed, cleanup
096               if (readSize == -1) {
097                   onException(new EOFException());
098                   selection.close();
099                   break;
100               }
101
102               // nothing more to read, break
103               if (readSize == 0) {
104                   break;
105               }
106
107               receiveCounter += readSize;
108
109               inputBuffer.flip();
110
111               ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
112               codec.parse(input, readSize);
113
114               // clear the buffer
115               inputBuffer.clear();
116           }
117        } catch (IOException e) {
118            onException(e);
119        } catch (Throwable e) {
120            onException(IOExceptionSupport.create(e));
121        }
122    }
123
124    @Override
125    protected void doStart() throws Exception {
126        connect();
127        selection.setInterestOps(SelectionKey.OP_READ);
128        selection.enable();
129    }
130
131    @Override
132    protected void doStop(ServiceStopper stopper) throws Exception {
133        try {
134            if (selection != null) {
135                selection.close();
136            }
137        } finally {
138            super.doStop(stopper);
139        }
140    }
141}