001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import java.util.ArrayList;
020import java.util.List;
021
022import org.apache.activemq.command.Command;
023import org.apache.activemq.transport.amqp.ResponseHandler;
024import org.apache.qpid.proton.amqp.transport.ErrorCondition;
025import org.apache.qpid.proton.engine.Link;
026import org.apache.qpid.proton.engine.Sender;
027
028/**
029 * Abstract AmqpLink implementation that provide basic Link services.
030 */
031public abstract class AmqpAbstractLink<LINK_TYPE extends Link> implements AmqpLink {
032
033    protected final AmqpSession session;
034    protected final LINK_TYPE endpoint;
035
036    protected boolean closed;
037    protected boolean opened;
038    protected List<Runnable> closeActions = new ArrayList<Runnable>();
039
040    /**
041     * Creates a new AmqpLink type.
042     *
043     * @param session
044     *        the AmqpSession that servers as the parent of this Link.
045     * @param endpoint
046     *        the link endpoint this object represents.
047     */
048    public AmqpAbstractLink(AmqpSession session, LINK_TYPE endpoint) {
049        this.session = session;
050        this.endpoint = endpoint;
051    }
052
053    @Override
054    public void open() {
055        if (!opened) {
056            getEndpoint().setContext(this);
057            getEndpoint().open();
058
059            opened = true;
060        }
061    }
062
063    @Override
064    public void detach() {
065        if (!closed) {
066            if (getEndpoint() != null) {
067                getEndpoint().setContext(null);
068                getEndpoint().detach();
069                getEndpoint().free();
070            }
071        }
072    }
073
074    @Override
075    public void close(ErrorCondition error) {
076        if (!closed) {
077
078            if (getEndpoint() != null) {
079                if (getEndpoint() instanceof Sender) {
080                    getEndpoint().setSource(null);
081                } else {
082                    getEndpoint().setTarget(null);
083                }
084                getEndpoint().setCondition(error);
085            }
086
087            close();
088        }
089    }
090
091    @Override
092    public void close() {
093        if (!closed) {
094
095            if (getEndpoint() != null) {
096                getEndpoint().setContext(null);
097                getEndpoint().close();
098                getEndpoint().free();
099            }
100
101            for (Runnable action : closeActions) {
102                action.run();
103            }
104
105            closeActions.clear();
106            opened = false;
107            closed = true;
108        }
109    }
110
111    /**
112     * @return true if this link has already been opened.
113     */
114    public boolean isOpened() {
115        return opened;
116    }
117
118    /**
119     * @return true if this link has already been closed.
120     */
121    public boolean isClosed() {
122        return closed;
123    }
124
125    /**
126     * @return the Proton Link type this link represents.
127     */
128    public LINK_TYPE getEndpoint() {
129        return endpoint;
130    }
131
132    /**
133     * @return the parent AmqpSession for this Link instance.
134     */
135    public AmqpSession getSession() {
136        return session;
137    }
138
139    @Override
140    public void addCloseAction(Runnable action) {
141        closeActions.add(action);
142    }
143
144    /**
145     * Shortcut method to hand off an ActiveMQ Command to the broker and assign
146     * a ResponseHandler to deal with any reply from the broker.
147     *
148     * @param command
149     *        the Command object to send to the Broker.
150     */
151    protected void sendToActiveMQ(Command command) {
152        session.getConnection().sendToActiveMQ(command, null);
153    }
154
155    /**
156     * Shortcut method to hand off an ActiveMQ Command to the broker and assign
157     * a ResponseHandler to deal with any reply from the broker.
158     *
159     * @param command
160     *        the Command object to send to the Broker.
161     * @param handler
162     *        the ResponseHandler that will handle the Broker's response.
163     */
164    protected void sendToActiveMQ(Command command, ResponseHandler handler) {
165        session.getConnection().sendToActiveMQ(command, handler);
166    }
167}