001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport; 018 019import java.io.IOException; 020import java.net.URI; 021import org.apache.activemq.util.ServiceSupport; 022import org.slf4j.Logger; 023import org.slf4j.LoggerFactory; 024 025/** 026 * A useful base class for transport implementations. 027 * 028 * 029 */ 030public abstract class TransportSupport extends ServiceSupport implements Transport { 031 private static final Logger LOG = LoggerFactory.getLogger(TransportSupport.class); 032 033 TransportListener transportListener; 034 035 /** 036 * Returns the current transport listener 037 */ 038 public TransportListener getTransportListener() { 039 return transportListener; 040 } 041 042 /** 043 * Registers an inbound command listener 044 * 045 * @param commandListener 046 */ 047 public void setTransportListener(TransportListener commandListener) { 048 this.transportListener = commandListener; 049 } 050 051 /** 052 * narrow acceptance 053 * 054 * @param target 055 * @return 'this' if assignable 056 */ 057 public <T> T narrow(Class<T> target) { 058 boolean assignableFrom = target.isAssignableFrom(getClass()); 059 if (assignableFrom) { 060 return target.cast(this); 061 } 062 return null; 063 } 064 065 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 066 throw new AssertionError("Unsupported Method"); 067 } 068 069 public Object request(Object command) throws IOException { 070 throw new AssertionError("Unsupported Method"); 071 } 072 073 public Object request(Object command, int timeout) throws IOException { 074 throw new AssertionError("Unsupported Method"); 075 } 076 077 /** 078 * Process the inbound command 079 */ 080 public void doConsume(Object command) { 081 if (command != null) { 082 if (transportListener != null) { 083 transportListener.onCommand(command); 084 } else { 085 LOG.error("No transportListener available to process inbound command: " + command); 086 } 087 } 088 } 089 090 /** 091 * Passes any IO exceptions into the transport listener 092 */ 093 public void onException(IOException e) { 094 if (transportListener != null) { 095 try { 096 transportListener.onException(e); 097 } catch (RuntimeException e2) { 098 // Handle any unexpected runtime exceptions by debug logging 099 // them. 100 LOG.debug("Unexpected runtime exception: " + e2, e2); 101 } 102 } 103 } 104 105 protected void checkStarted() throws IOException { 106 if (!isStarted()) { 107 throw new IOException("The transport is not running."); 108 } 109 } 110 111 public boolean isFaultTolerant() { 112 return false; 113 } 114 115 public void reconnect(URI uri) throws IOException { 116 throw new IOException("Not supported"); 117 } 118 119 public boolean isReconnectSupported() { 120 return false; 121 } 122 123 public boolean isUpdateURIsSupported() { 124 return false; 125 } 126 public void updateURIs(boolean reblance,URI[] uris) throws IOException { 127 throw new IOException("Not supported"); 128 } 129 130 public boolean isDisposed() { 131 return isStopped(); 132 } 133 134 public boolean isConnected() { 135 return isStarted(); 136 } 137 138}