001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.protocol; 018 019import java.util.ArrayList; 020import java.util.List; 021 022import org.apache.activemq.command.Command; 023import org.apache.activemq.transport.amqp.ResponseHandler; 024import org.apache.qpid.proton.amqp.transport.ErrorCondition; 025import org.apache.qpid.proton.engine.Link; 026import org.apache.qpid.proton.engine.Sender; 027 028/** 029 * Abstract AmqpLink implementation that provide basic Link services. 030 */ 031public abstract class AmqpAbstractLink<LINK_TYPE extends Link> implements AmqpLink { 032 033 protected final AmqpSession session; 034 protected final LINK_TYPE endpoint; 035 036 protected boolean closed; 037 protected boolean opened; 038 protected List<Runnable> closeActions = new ArrayList<Runnable>(); 039 040 /** 041 * Creates a new AmqpLink type. 042 * 043 * @param session 044 * the AmqpSession that servers as the parent of this Link. 045 * @param endpoint 046 * the link endpoint this object represents. 047 */ 048 public AmqpAbstractLink(AmqpSession session, LINK_TYPE endpoint) { 049 this.session = session; 050 this.endpoint = endpoint; 051 } 052 053 @Override 054 public void open() { 055 if (!opened) { 056 getEndpoint().setContext(this); 057 getEndpoint().open(); 058 059 opened = true; 060 } 061 } 062 063 @Override 064 public void detach() { 065 if (!closed) { 066 if (getEndpoint() != null) { 067 getEndpoint().setContext(null); 068 getEndpoint().detach(); 069 getEndpoint().free(); 070 } 071 } 072 } 073 074 @Override 075 public void close(ErrorCondition error) { 076 if (!closed) { 077 078 if (getEndpoint() != null) { 079 if (getEndpoint() instanceof Sender) { 080 getEndpoint().setSource(null); 081 } else { 082 getEndpoint().setTarget(null); 083 } 084 getEndpoint().setCondition(error); 085 } 086 087 close(); 088 } 089 } 090 091 @Override 092 public void close() { 093 if (!closed) { 094 095 if (getEndpoint() != null) { 096 getEndpoint().setContext(null); 097 getEndpoint().close(); 098 getEndpoint().free(); 099 } 100 101 for (Runnable action : closeActions) { 102 action.run(); 103 } 104 105 closeActions.clear(); 106 opened = false; 107 closed = true; 108 } 109 } 110 111 /** 112 * @return true if this link has already been opened. 113 */ 114 public boolean isOpened() { 115 return opened; 116 } 117 118 /** 119 * @return true if this link has already been closed. 120 */ 121 public boolean isClosed() { 122 return closed; 123 } 124 125 /** 126 * @return the Proton Link type this link represents. 127 */ 128 public LINK_TYPE getEndpoint() { 129 return endpoint; 130 } 131 132 /** 133 * @return the parent AmqpSession for this Link instance. 134 */ 135 public AmqpSession getSession() { 136 return session; 137 } 138 139 @Override 140 public void addCloseAction(Runnable action) { 141 closeActions.add(action); 142 } 143 144 /** 145 * Shortcut method to hand off an ActiveMQ Command to the broker and assign 146 * a ResponseHandler to deal with any reply from the broker. 147 * 148 * @param command 149 * the Command object to send to the Broker. 150 */ 151 protected void sendToActiveMQ(Command command) { 152 session.getConnection().sendToActiveMQ(command, null); 153 } 154 155 /** 156 * Shortcut method to hand off an ActiveMQ Command to the broker and assign 157 * a ResponseHandler to deal with any reply from the broker. 158 * 159 * @param command 160 * the Command object to send to the Broker. 161 * @param handler 162 * the ResponseHandler that will handle the Broker's response. 163 */ 164 protected void sendToActiveMQ(Command command, ResponseHandler handler) { 165 session.getConnection().sendToActiveMQ(command, handler); 166 } 167}