001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.protocol; 018 019import static org.apache.activemq.transport.amqp.AmqpSupport.toBytes; 020import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; 021 022import java.io.IOException; 023import java.util.HashSet; 024import java.util.Set; 025 026import org.apache.activemq.command.ActiveMQDestination; 027import org.apache.activemq.command.ConnectionId; 028import org.apache.activemq.command.ExceptionResponse; 029import org.apache.activemq.command.LocalTransactionId; 030import org.apache.activemq.command.Response; 031import org.apache.activemq.command.TransactionInfo; 032import org.apache.activemq.transport.amqp.AmqpProtocolConverter; 033import org.apache.activemq.transport.amqp.ResponseHandler; 034import org.apache.qpid.proton.Proton; 035import org.apache.qpid.proton.amqp.Binary; 036import org.apache.qpid.proton.amqp.Symbol; 037import org.apache.qpid.proton.amqp.messaging.Accepted; 038import org.apache.qpid.proton.amqp.messaging.AmqpValue; 039import org.apache.qpid.proton.amqp.messaging.Rejected; 040import org.apache.qpid.proton.amqp.transaction.Declare; 041import org.apache.qpid.proton.amqp.transaction.Declared; 042import org.apache.qpid.proton.amqp.transaction.Discharge; 043import org.apache.qpid.proton.amqp.transport.ErrorCondition; 044import org.apache.qpid.proton.engine.Delivery; 045import org.apache.qpid.proton.engine.Receiver; 046import org.apache.qpid.proton.message.Message; 047import org.fusesource.hawtbuf.Buffer; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * Implements the AMQP Transaction Coordinator support to manage local 053 * transactions between an AMQP client and the broker. 054 */ 055public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { 056 057 private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class); 058 059 private final Set<AmqpSession> txSessions = new HashSet<AmqpSession>(); 060 061 /** 062 * Creates a new Transaction coordinator used to manage AMQP transactions. 063 * 064 * @param session 065 * the AmqpSession under which the coordinator was created. 066 * @param receiver 067 * the AMQP receiver link endpoint for this coordinator. 068 */ 069 public AmqpTransactionCoordinator(AmqpSession session, Receiver endpoint) { 070 super(session, endpoint); 071 } 072 073 @Override 074 protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception { 075 Message message = Proton.message(); 076 int offset = deliveryBytes.offset; 077 int len = deliveryBytes.length; 078 079 while (len > 0) { 080 final int decoded = message.decode(deliveryBytes.data, offset, len); 081 assert decoded > 0 : "Make progress decoding the message"; 082 offset += decoded; 083 len -= decoded; 084 } 085 086 final AmqpSession session = (AmqpSession) getEndpoint().getSession().getContext(); 087 final ConnectionId connectionId = session.getConnection().getConnectionId(); 088 final Object action = ((AmqpValue) message.getBody()).getValue(); 089 090 LOG.debug("COORDINATOR received: {}, [{}]", action, deliveryBytes); 091 if (action instanceof Declare) { 092 Declare declare = (Declare) action; 093 if (declare.getGlobalId() != null) { 094 throw new Exception("don't know how to handle a declare /w a set GlobalId"); 095 } 096 097 LocalTransactionId txId = session.getConnection().getNextTransactionId(); 098 TransactionInfo txInfo = new TransactionInfo(connectionId, txId, TransactionInfo.BEGIN); 099 session.getConnection().registerTransaction(txId, this); 100 sendToActiveMQ(txInfo, null); 101 LOG.trace("started transaction {}", txId.getValue()); 102 103 Declared declared = new Declared(); 104 declared.setTxnId(new Binary(toBytes(txId.getValue()))); 105 delivery.disposition(declared); 106 delivery.settle(); 107 } else if (action instanceof Discharge) { 108 final Discharge discharge = (Discharge) action; 109 final LocalTransactionId txId = new LocalTransactionId(connectionId, toLong(discharge.getTxnId())); 110 final byte operation; 111 112 if (discharge.getFail()) { 113 LOG.trace("rollback transaction {}", txId.getValue()); 114 operation = TransactionInfo.ROLLBACK; 115 } else { 116 LOG.trace("commit transaction {}", txId.getValue()); 117 operation = TransactionInfo.COMMIT_ONE_PHASE; 118 } 119 120 for (AmqpSession txSession : txSessions) { 121 if (operation == TransactionInfo.ROLLBACK) { 122 txSession.rollback(); 123 } else { 124 txSession.commit(); 125 } 126 } 127 128 txSessions.clear(); 129 session.getConnection().unregisterTransaction(txId); 130 131 TransactionInfo txinfo = new TransactionInfo(connectionId, txId, operation); 132 sendToActiveMQ(txinfo, new ResponseHandler() { 133 @Override 134 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 135 if (response.isException()) { 136 ExceptionResponse er = (ExceptionResponse) response; 137 Rejected rejected = new Rejected(); 138 rejected.setError(new ErrorCondition(Symbol.valueOf("failed"), er.getException().getMessage())); 139 delivery.disposition(rejected); 140 } else { 141 delivery.disposition(Accepted.getInstance()); 142 } 143 144 LOG.debug("TX: {} settling {}", operation, action); 145 delivery.settle(); 146 session.pumpProtonToSocket(); 147 } 148 }); 149 150 if (operation == TransactionInfo.ROLLBACK) { 151 session.flushPendingMessages(); 152 } 153 154 } else { 155 throw new Exception("Expected coordinator message type: " + action.getClass()); 156 } 157 158 replenishCredit(); 159 } 160 161 private void replenishCredit() { 162 if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) { 163 LOG.debug("Sending more credit ({}) to transaction coordinator on session {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), session.getSessionId()); 164 getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit()); 165 session.pumpProtonToSocket(); 166 } 167 } 168 169 @Override 170 public ActiveMQDestination getDestination() { 171 return null; 172 } 173 174 @Override 175 public void setDestination(ActiveMQDestination destination) { 176 } 177 178 public void enlist(AmqpSession session) { 179 txSessions.add(session); 180 } 181}