001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.vm;
018
019import java.io.IOException;
020import java.net.InetSocketAddress;
021import java.net.URI;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.atomic.AtomicInteger;
024
025import org.apache.activemq.command.BrokerInfo;
026import org.apache.activemq.transport.MutexTransport;
027import org.apache.activemq.transport.ResponseCorrelator;
028import org.apache.activemq.transport.Transport;
029import org.apache.activemq.transport.TransportAcceptListener;
030import org.apache.activemq.transport.TransportServer;
031
032/**
033 * Broker side of the VMTransport
034 */
035public class VMTransportServer implements TransportServer {
036
037    private TransportAcceptListener acceptListener;
038    private final URI location;
039    private AtomicBoolean disposed = new AtomicBoolean(false);
040
041    private final AtomicInteger connectionCount = new AtomicInteger(0);
042    private final boolean disposeOnDisconnect;
043    private boolean allowLinkStealing;
044
045    /**
046     * @param location
047     * @param disposeOnDisconnect
048     */
049    public VMTransportServer(URI location, boolean disposeOnDisconnect) {
050        this.location = location;
051        this.disposeOnDisconnect = disposeOnDisconnect;
052    }
053
054    /**
055     * @return a pretty print of this
056     */
057    public String toString() {
058        return "VMTransportServer(" + location + ")";
059    }
060
061    /**
062     * @return new VMTransport
063     * @throws IOException
064     */
065    public VMTransport connect() throws IOException {
066        TransportAcceptListener al;
067        synchronized (this) {
068            if (disposed.get()) {
069                throw new IOException("Server has been disposed.");
070            }
071            al = acceptListener;
072        }
073        if (al == null) {
074            throw new IOException("Server TransportAcceptListener is null.");
075        }
076
077        connectionCount.incrementAndGet();
078        VMTransport client = new VMTransport(location) {
079            public void stop() throws Exception {
080                if (!disposed.get()) {
081                    super.stop();
082                    if (connectionCount.decrementAndGet() == 0 && disposeOnDisconnect) {
083                        VMTransportServer.this.stop();
084                    }
085                }
086            };
087        };
088
089        VMTransport server = new VMTransport(location);
090        client.setPeer(server);
091        server.setPeer(client);
092        al.onAccept(configure(server));
093        return client;
094    }
095
096    /**
097     * Configure transport
098     *
099     * @param transport
100     * @return the Transport
101     */
102    public static Transport configure(Transport transport) {
103        transport = new MutexTransport(transport);
104        transport = new ResponseCorrelator(transport);
105        return transport;
106    }
107
108    /**
109     * Set the Transport accept listener for new Connections
110     *
111     * @param acceptListener
112     */
113    public synchronized void setAcceptListener(TransportAcceptListener acceptListener) {
114        this.acceptListener = acceptListener;
115    }
116
117    public void start() throws IOException {
118    }
119
120    public void stop() throws IOException {
121        if (disposed.compareAndSet(false, true)) {
122            VMTransportFactory.stopped(this);
123        }
124    }
125
126    public URI getConnectURI() {
127        return location;
128    }
129
130    public URI getBindURI() {
131        return location;
132    }
133
134    public void setBrokerInfo(BrokerInfo brokerInfo) {
135    }
136
137    public InetSocketAddress getSocketAddress() {
138        return null;
139    }
140
141    public int getConnectionCount() {
142        return connectionCount.intValue();
143    }
144
145    @Override
146    public boolean isSslServer() {
147        return false;
148    }
149
150    @Override
151    public boolean isAllowLinkStealing() {
152        return allowLinkStealing;
153    }
154
155    public void setAllowLinkStealing(boolean allowLinkStealing) {
156        this.allowLinkStealing = allowLinkStealing;
157    }
158}