001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import static org.apache.activemq.transport.amqp.AmqpSupport.toBytes;
020import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
021
022import java.io.IOException;
023import java.util.HashSet;
024import java.util.Set;
025
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.ConnectionId;
028import org.apache.activemq.command.ExceptionResponse;
029import org.apache.activemq.command.LocalTransactionId;
030import org.apache.activemq.command.Response;
031import org.apache.activemq.command.TransactionInfo;
032import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
033import org.apache.activemq.transport.amqp.ResponseHandler;
034import org.apache.qpid.proton.Proton;
035import org.apache.qpid.proton.amqp.Binary;
036import org.apache.qpid.proton.amqp.Symbol;
037import org.apache.qpid.proton.amqp.messaging.Accepted;
038import org.apache.qpid.proton.amqp.messaging.AmqpValue;
039import org.apache.qpid.proton.amqp.messaging.Rejected;
040import org.apache.qpid.proton.amqp.transaction.Declare;
041import org.apache.qpid.proton.amqp.transaction.Declared;
042import org.apache.qpid.proton.amqp.transaction.Discharge;
043import org.apache.qpid.proton.amqp.transport.ErrorCondition;
044import org.apache.qpid.proton.engine.Delivery;
045import org.apache.qpid.proton.engine.Receiver;
046import org.apache.qpid.proton.message.Message;
047import org.fusesource.hawtbuf.Buffer;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * Implements the AMQP Transaction Coordinator support to manage local
053 * transactions between an AMQP client and the broker.
054 */
055public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
056
057    private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class);
058
059    private final Set<AmqpSession> txSessions = new HashSet<AmqpSession>();
060
061    /**
062     * Creates a new Transaction coordinator used to manage AMQP transactions.
063     *
064     * @param session
065     *        the AmqpSession under which the coordinator was created.
066     * @param receiver
067     *        the AMQP receiver link endpoint for this coordinator.
068     */
069    public AmqpTransactionCoordinator(AmqpSession session, Receiver endpoint) {
070        super(session, endpoint);
071    }
072
073    @Override
074    protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception {
075        Message message = Proton.message();
076        int offset = deliveryBytes.offset;
077        int len = deliveryBytes.length;
078
079        while (len > 0) {
080            final int decoded = message.decode(deliveryBytes.data, offset, len);
081            assert decoded > 0 : "Make progress decoding the message";
082            offset += decoded;
083            len -= decoded;
084        }
085
086        final AmqpSession session = (AmqpSession) getEndpoint().getSession().getContext();
087        final ConnectionId connectionId = session.getConnection().getConnectionId();
088        final Object action = ((AmqpValue) message.getBody()).getValue();
089
090        LOG.debug("COORDINATOR received: {}, [{}]", action, deliveryBytes);
091        if (action instanceof Declare) {
092            Declare declare = (Declare) action;
093            if (declare.getGlobalId() != null) {
094                throw new Exception("don't know how to handle a declare /w a set GlobalId");
095            }
096
097            LocalTransactionId txId = session.getConnection().getNextTransactionId();
098            TransactionInfo txInfo = new TransactionInfo(connectionId, txId, TransactionInfo.BEGIN);
099            session.getConnection().registerTransaction(txId, this);
100            sendToActiveMQ(txInfo, null);
101            LOG.trace("started transaction {}", txId.getValue());
102
103            Declared declared = new Declared();
104            declared.setTxnId(new Binary(toBytes(txId.getValue())));
105            delivery.disposition(declared);
106            delivery.settle();
107        } else if (action instanceof Discharge) {
108            final Discharge discharge = (Discharge) action;
109            final LocalTransactionId txId = new LocalTransactionId(connectionId, toLong(discharge.getTxnId()));
110            final byte operation;
111
112            if (discharge.getFail()) {
113                LOG.trace("rollback transaction {}", txId.getValue());
114                operation = TransactionInfo.ROLLBACK;
115            } else {
116                LOG.trace("commit transaction {}", txId.getValue());
117                operation = TransactionInfo.COMMIT_ONE_PHASE;
118            }
119
120            for (AmqpSession txSession : txSessions) {
121                if (operation == TransactionInfo.ROLLBACK) {
122                    txSession.rollback();
123                } else {
124                    txSession.commit();
125                }
126            }
127
128            txSessions.clear();
129            session.getConnection().unregisterTransaction(txId);
130
131            TransactionInfo txinfo = new TransactionInfo(connectionId, txId, operation);
132            sendToActiveMQ(txinfo, new ResponseHandler() {
133                @Override
134                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
135                    if (response.isException()) {
136                        ExceptionResponse er = (ExceptionResponse) response;
137                        Rejected rejected = new Rejected();
138                        rejected.setError(new ErrorCondition(Symbol.valueOf("failed"), er.getException().getMessage()));
139                        delivery.disposition(rejected);
140                    } else {
141                        delivery.disposition(Accepted.getInstance());
142                    }
143
144                    LOG.debug("TX: {} settling {}", operation, action);
145                    delivery.settle();
146                    session.pumpProtonToSocket();
147                }
148            });
149
150            if (operation == TransactionInfo.ROLLBACK) {
151                session.flushPendingMessages();
152            }
153
154        } else {
155            throw new Exception("Expected coordinator message type: " + action.getClass());
156        }
157
158        replenishCredit();
159    }
160
161    private void replenishCredit() {
162        if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) {
163            LOG.debug("Sending more credit ({}) to transaction coordinator on session {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), session.getSessionId());
164            getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
165            session.pumpProtonToSocket();
166        }
167    }
168
169    @Override
170    public ActiveMQDestination getDestination() {
171        return null;
172    }
173
174    @Override
175    public void setDestination(ActiveMQDestination destination) {
176    }
177
178    public void enlist(AmqpSession session) {
179        txSessions.add(session);
180    }
181}