001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.vm; 018 019import java.io.IOException; 020import java.net.InetSocketAddress; 021import java.net.URI; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.atomic.AtomicInteger; 024 025import org.apache.activemq.command.BrokerInfo; 026import org.apache.activemq.transport.MutexTransport; 027import org.apache.activemq.transport.ResponseCorrelator; 028import org.apache.activemq.transport.Transport; 029import org.apache.activemq.transport.TransportAcceptListener; 030import org.apache.activemq.transport.TransportServer; 031 032/** 033 * Broker side of the VMTransport 034 */ 035public class VMTransportServer implements TransportServer { 036 037 private TransportAcceptListener acceptListener; 038 private final URI location; 039 private AtomicBoolean disposed = new AtomicBoolean(false); 040 041 private final AtomicInteger connectionCount = new AtomicInteger(0); 042 private final boolean disposeOnDisconnect; 043 private boolean allowLinkStealing; 044 045 /** 046 * @param location 047 * @param disposeOnDisconnect 048 */ 049 public VMTransportServer(URI location, boolean disposeOnDisconnect) { 050 this.location = location; 051 this.disposeOnDisconnect = disposeOnDisconnect; 052 } 053 054 /** 055 * @return a pretty print of this 056 */ 057 public String toString() { 058 return "VMTransportServer(" + location + ")"; 059 } 060 061 /** 062 * @return new VMTransport 063 * @throws IOException 064 */ 065 public VMTransport connect() throws IOException { 066 TransportAcceptListener al; 067 synchronized (this) { 068 if (disposed.get()) { 069 throw new IOException("Server has been disposed."); 070 } 071 al = acceptListener; 072 } 073 if (al == null) { 074 throw new IOException("Server TransportAcceptListener is null."); 075 } 076 077 connectionCount.incrementAndGet(); 078 VMTransport client = new VMTransport(location) { 079 public void stop() throws Exception { 080 if (!disposed.get()) { 081 super.stop(); 082 if (connectionCount.decrementAndGet() == 0 && disposeOnDisconnect) { 083 VMTransportServer.this.stop(); 084 } 085 } 086 }; 087 }; 088 089 VMTransport server = new VMTransport(location); 090 client.setPeer(server); 091 server.setPeer(client); 092 al.onAccept(configure(server)); 093 return client; 094 } 095 096 /** 097 * Configure transport 098 * 099 * @param transport 100 * @return the Transport 101 */ 102 public static Transport configure(Transport transport) { 103 transport = new MutexTransport(transport); 104 transport = new ResponseCorrelator(transport); 105 return transport; 106 } 107 108 /** 109 * Set the Transport accept listener for new Connections 110 * 111 * @param acceptListener 112 */ 113 public synchronized void setAcceptListener(TransportAcceptListener acceptListener) { 114 this.acceptListener = acceptListener; 115 } 116 117 public void start() throws IOException { 118 } 119 120 public void stop() throws IOException { 121 if (disposed.compareAndSet(false, true)) { 122 VMTransportFactory.stopped(this); 123 } 124 } 125 126 public URI getConnectURI() { 127 return location; 128 } 129 130 public URI getBindURI() { 131 return location; 132 } 133 134 public void setBrokerInfo(BrokerInfo brokerInfo) { 135 } 136 137 public InetSocketAddress getSocketAddress() { 138 return null; 139 } 140 141 public int getConnectionCount() { 142 return connectionCount.intValue(); 143 } 144 145 @Override 146 public boolean isSslServer() { 147 return false; 148 } 149 150 @Override 151 public boolean isAllowLinkStealing() { 152 return allowLinkStealing; 153 } 154 155 public void setAllowLinkStealing(boolean allowLinkStealing) { 156 this.allowLinkStealing = allowLinkStealing; 157 } 158}