001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport; 018 019import java.io.IOException; 020import java.net.URI; 021 022/** 023 * 024 */ 025public class TransportFilter implements TransportListener, Transport { 026 protected final Transport next; 027 protected TransportListener transportListener; 028 029 public TransportFilter(Transport next) { 030 this.next = next; 031 } 032 033 public TransportListener getTransportListener() { 034 return transportListener; 035 } 036 037 public void setTransportListener(TransportListener channelListener) { 038 this.transportListener = channelListener; 039 if (channelListener == null) { 040 next.setTransportListener(null); 041 } else { 042 next.setTransportListener(this); 043 } 044 } 045 046 /** 047 * @see org.apache.activemq.Service#start() 048 * @throws IOException 049 * if the next channel has not been set. 050 */ 051 public void start() throws Exception { 052 if (next == null) { 053 throw new IOException("The next channel has not been set."); 054 } 055 if (transportListener == null) { 056 throw new IOException("The command listener has not been set."); 057 } 058 next.start(); 059 } 060 061 /** 062 * @see org.apache.activemq.Service#stop() 063 */ 064 public void stop() throws Exception { 065 next.stop(); 066 } 067 068 public void onCommand(Object command) { 069 transportListener.onCommand(command); 070 } 071 072 /** 073 * @return Returns the next. 074 */ 075 public Transport getNext() { 076 return next; 077 } 078 079 @Override 080 public String toString() { 081 return next.toString(); 082 } 083 084 public void oneway(Object command) throws IOException { 085 next.oneway(command); 086 } 087 088 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 089 return next.asyncRequest(command, null); 090 } 091 092 public Object request(Object command) throws IOException { 093 return next.request(command); 094 } 095 096 public Object request(Object command, int timeout) throws IOException { 097 return next.request(command, timeout); 098 } 099 100 public void onException(IOException error) { 101 transportListener.onException(error); 102 } 103 104 public void transportInterupted() { 105 transportListener.transportInterupted(); 106 } 107 108 public void transportResumed() { 109 transportListener.transportResumed(); 110 } 111 112 public <T> T narrow(Class<T> target) { 113 if (target.isAssignableFrom(getClass())) { 114 return target.cast(this); 115 } 116 return next.narrow(target); 117 } 118 119 public String getRemoteAddress() { 120 return next.getRemoteAddress(); 121 } 122 123 /** 124 * @return 125 * @see org.apache.activemq.transport.Transport#isFaultTolerant() 126 */ 127 public boolean isFaultTolerant() { 128 return next.isFaultTolerant(); 129 } 130 131 public boolean isDisposed() { 132 return next.isDisposed(); 133 } 134 135 public boolean isConnected() { 136 return next.isConnected(); 137 } 138 139 public void reconnect(URI uri) throws IOException { 140 next.reconnect(uri); 141 } 142 143 public int getReceiveCounter() { 144 return next.getReceiveCounter(); 145 } 146 147 public boolean isReconnectSupported() { 148 return next.isReconnectSupported(); 149 } 150 151 public boolean isUpdateURIsSupported() { 152 return next.isUpdateURIsSupported(); 153 } 154 155 public void updateURIs(boolean rebalance,URI[] uris) throws IOException { 156 next.updateURIs(rebalance,uris); 157 } 158}