001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import org.apache.activemq.transport.amqp.AmqpProtocolException;
020import org.apache.qpid.proton.engine.Delivery;
021import org.apache.qpid.proton.engine.Receiver;
022import org.fusesource.hawtbuf.Buffer;
023import org.fusesource.hawtbuf.ByteArrayOutputStream;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027/**
028 * Abstract base that provides common services for AMQP Receiver types.
029 */
030public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver> {
031
032    private static final Logger LOG = LoggerFactory.getLogger(AmqpAbstractReceiver.class);
033
034    protected ByteArrayOutputStream current = new ByteArrayOutputStream();
035    protected final byte[] recvBuffer = new byte[1024 * 8];
036    protected final int configuredCredit;
037
038    /**
039     * Handle create of new AMQP Receiver instance.
040     *
041     * @param session
042     *        the AmqpSession that servers as the parent of this Link.
043     * @param endpoint
044     *        the Receiver endpoint being managed by this class.
045     */
046    public AmqpAbstractReceiver(AmqpSession session, Receiver endpoint) {
047        super(session, endpoint);
048        this.configuredCredit = session.getConnection().getConfiguredReceiverCredit();
049    }
050
051    @Override
052    public void detach() {
053    }
054
055    @Override
056    public void flow() throws Exception {
057    }
058
059    /**
060     * Returns the amount of receiver credit that has been configured for this AMQP
061     * transport.  If no value was configured on the TransportConnector URI then a
062     * sensible default is used.
063     *
064     * @return the configured receiver credit to grant.
065     */
066    public int getConfiguredReceiverCredit() {
067        return configuredCredit;
068    }
069
070    /**
071     * Provide the receiver endpoint with the given amount of credits.
072     *
073     * @param credits
074     *        the credit value to pass on to the wrapped Receiver.
075     */
076    public void flow(int credits) {
077        getEndpoint().flow(credits);
078    }
079
080    @Override
081    public void commit() throws Exception {
082    }
083
084    @Override
085    public void rollback() throws Exception {
086    }
087
088    @Override
089    public void delivery(Delivery delivery) throws Exception {
090
091        if (!delivery.isReadable()) {
092            LOG.debug("Delivery was not readable!");
093            return;
094        }
095
096        if (current == null) {
097            current = new ByteArrayOutputStream();
098        }
099
100        int count;
101        while ((count = getEndpoint().recv(recvBuffer, 0, recvBuffer.length)) > 0) {
102            current.write(recvBuffer, 0, count);
103
104            if (current.size() > session.getMaxFrameSize()) {
105                throw new AmqpProtocolException("Frame size of " + current.size() + " larger than max allowed " + session.getMaxFrameSize());
106            }
107        }
108
109        // Expecting more deliveries..
110        if (count == 0) {
111            return;
112        }
113
114        try {
115            processDelivery(delivery, current.toBuffer());
116        } finally {
117            getEndpoint().advance();
118            current = null;
119        }
120    }
121
122    protected abstract void processDelivery(Delivery delivery, Buffer deliveryBytes) throws Exception;
123
124}