001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp;
018
019import java.io.DataOutputStream;
020import java.io.EOFException;
021import java.io.IOException;
022import java.net.Socket;
023import java.net.URI;
024import java.net.UnknownHostException;
025import java.nio.ByteBuffer;
026import java.nio.channels.SelectionKey;
027import java.nio.channels.SocketChannel;
028
029import javax.net.SocketFactory;
030
031import org.apache.activemq.transport.nio.NIOOutputStream;
032import org.apache.activemq.transport.nio.SelectorManager;
033import org.apache.activemq.transport.nio.SelectorSelection;
034import org.apache.activemq.transport.tcp.TcpTransport;
035import org.apache.activemq.util.IOExceptionSupport;
036import org.apache.activemq.util.ServiceStopper;
037import org.apache.activemq.wireformat.WireFormat;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
043 */
044public class AmqpNioTransport extends TcpTransport {
045
046    private static final Logger LOG = LoggerFactory.getLogger(AmqpNioTransport.class);
047
048    private SocketChannel channel;
049    private SelectorSelection selection;
050    private final AmqpFrameParser frameReader = new AmqpFrameParser(this);
051
052    private ByteBuffer inputBuffer;
053
054    public AmqpNioTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
055        super(wireFormat, socketFactory, remoteLocation, localLocation);
056
057        frameReader.setWireFormat((AmqpWireFormat) wireFormat);
058    }
059
060    public AmqpNioTransport(WireFormat wireFormat, Socket socket) throws IOException {
061        super(wireFormat, socket);
062
063        frameReader.setWireFormat((AmqpWireFormat) wireFormat);
064    }
065
066    @Override
067    protected void initializeStreams() throws IOException {
068        channel = socket.getChannel();
069        channel.configureBlocking(false);
070        // listen for events telling us when the socket is readable.
071        selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
072            @Override
073            public void onSelect(SelectorSelection selection) {
074                if (!isStopped()) {
075                    serviceRead();
076                }
077            }
078
079            @Override
080            public void onError(SelectorSelection selection, Throwable error) {
081                LOG.trace("Error detected: {}", error.getMessage());
082                if (error instanceof IOException) {
083                    onException((IOException) error);
084                } else {
085                    onException(IOExceptionSupport.create(error));
086                }
087            }
088        });
089
090        inputBuffer = ByteBuffer.allocate(8 * 1024);
091        NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
092        this.dataOut = new DataOutputStream(outPutStream);
093        this.buffOut = outPutStream;
094    }
095
096    boolean magicRead = false;
097
098    private void serviceRead() {
099        try {
100
101            while (isStarted()) {
102                // read channel
103                int readSize = channel.read(inputBuffer);
104                // channel is closed, cleanup
105                if (readSize == -1) {
106                    onException(new EOFException());
107                    selection.close();
108                    break;
109                }
110                // nothing more to read, break
111                if (readSize == 0) {
112                    break;
113                }
114
115                receiveCounter += readSize;
116
117                inputBuffer.flip();
118                frameReader.parse(inputBuffer);
119                inputBuffer.clear();
120            }
121        } catch (IOException e) {
122            onException(e);
123        } catch (Throwable e) {
124            onException(IOExceptionSupport.create(e));
125        }
126    }
127
128    @Override
129    protected void doStart() throws Exception {
130        connect();
131        selection.setInterestOps(SelectionKey.OP_READ);
132        selection.enable();
133    }
134
135    @Override
136    protected void doStop(ServiceStopper stopper) throws Exception {
137        try {
138            if (selection != null) {
139                selection.close();
140            }
141        } finally {
142            super.doStop(stopper);
143        }
144    }
145}