001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transaction;
018
019import java.io.IOException;
020import javax.transaction.xa.XAException;
021import javax.transaction.xa.XAResource;
022import org.apache.activemq.TransactionContext;
023import org.apache.activemq.broker.TransactionBroker;
024import org.apache.activemq.command.ConnectionId;
025import org.apache.activemq.command.TransactionId;
026import org.apache.activemq.command.XATransactionId;
027import org.apache.activemq.store.TransactionStore;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * 
033 */
034public class XATransaction extends Transaction {
035
036    private static final Logger LOG = LoggerFactory.getLogger(XATransaction.class);
037
038    private final TransactionStore transactionStore;
039    private final XATransactionId xid;
040    private final TransactionBroker broker;
041    private final ConnectionId connectionId;
042
043    public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker, ConnectionId connectionId) {
044        this.transactionStore = transactionStore;
045        this.xid = xid;
046        this.broker = broker;
047        this.connectionId = connectionId;
048        if (LOG.isDebugEnabled()) {
049            LOG.debug("XA Transaction new/begin : " + xid);
050        }
051    }
052
053    @Override
054    public void commit(boolean onePhase) throws XAException, IOException {
055        if (LOG.isDebugEnabled()) {
056            LOG.debug("XA Transaction commit onePhase:" + onePhase + ", xid: " + xid);
057        }
058
059        switch (getState()) {
060        case START_STATE:
061            // 1 phase commit, no work done.
062            checkForPreparedState(onePhase);
063            setStateFinished();
064            break;
065        case IN_USE_STATE:
066            // 1 phase commit, work done.
067            checkForPreparedState(onePhase);
068            doPrePrepare();
069            setStateFinished();
070            storeCommit(getTransactionId(), false, preCommitTask, postCommitTask);
071            break;
072        case PREPARED_STATE:
073            // 2 phase commit, work done.
074            // We would record commit here.
075            storeCommit(getTransactionId(), true, null /* done post prepare call */, postCommitTask);
076            setStateFinished();
077            break;
078        default:
079            illegalStateTransition("commit");
080        }
081    }
082
083    private void storeCommit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit)
084            throws XAException, IOException {
085        try {
086            transactionStore.commit(getTransactionId(), wasPrepared, preCommitTask, postCommitTask);
087            waitPostCommitDone(postCommitTask);
088        } catch (XAException xae) {
089            throw xae;
090        } catch (Throwable t) {
091            LOG.warn("Store COMMIT FAILED: " + txid, t);
092            XAException xae = null;
093            if (wasPrepared) {
094                // report and await outcome
095                xae = newXAException("STORE COMMIT FAILED: " + t.getMessage(), XAException.XA_RETRY);
096                // fire rollback syncs to revert
097                doPostRollback();
098            } else {
099                try {
100                    rollback();
101                    xae = newXAException("STORE COMMIT FAILED: Transaction rolled back", XAException.XA_RBCOMMFAIL);
102                } catch (Throwable e) {
103                    xae = newXAException("STORE COMMIT FAILED: " + t.getMessage() +". Rolled failed:"  + e.getMessage(), XAException.XA_RBINTEGRITY);
104                }
105            }
106            xae.initCause(t);
107            throw xae;
108        }
109    }
110
111    private void illegalStateTransition(String callName) throws XAException {
112        XAException xae = newXAException("Cannot call " + callName + " now.", XAException.XAER_PROTO);
113        throw xae;
114    }
115
116    private void checkForPreparedState(boolean onePhase) throws XAException {
117        if (!onePhase) {
118            XAException xae = newXAException("Cannot do 2 phase commit if the transaction has not been prepared", XAException.XAER_PROTO);
119            throw xae;
120        }
121    }
122
123    private void doPrePrepare() throws XAException, IOException {
124        try {
125            prePrepare();
126        } catch (XAException e) {
127            throw e;
128        } catch (Throwable e) {
129            LOG.warn("PRE-PREPARE FAILED: ", e);
130            rollback();
131            XAException xae = newXAException("PRE-PREPARE FAILED: Transaction rolled back", XAException.XA_RBOTHER);
132            xae.initCause(e);
133            throw xae;
134        }
135    }
136
137    @Override
138    public void rollback() throws XAException, IOException {
139
140        if (LOG.isDebugEnabled()) {
141            LOG.debug("XA Transaction rollback: " + xid);
142        }
143
144        switch (getState()) {
145        case START_STATE:
146            // 1 phase rollback no work done.
147            setStateFinished();
148            break;
149        case IN_USE_STATE:
150            // 1 phase rollback work done.
151            setStateFinished();
152            transactionStore.rollback(getTransactionId());
153            doPostRollback();
154            break;
155        case PREPARED_STATE:
156            // 2 phase rollback work done.
157            setStateFinished();
158            transactionStore.rollback(getTransactionId());
159            doPostRollback();
160            break;
161        case FINISHED_STATE:
162            // failure to commit
163            transactionStore.rollback(getTransactionId());
164            doPostRollback();
165            break;
166        default:
167            throw newXAException("Invalid state: " + getState(), XAException.XA_RBPROTO);
168        }
169
170    }
171
172    private void doPostRollback() throws XAException {
173        try {
174            fireAfterRollback();
175        } catch (Throwable e) {
176            // I guess this could happen. Post commit task failed
177            // to execute properly.
178            LOG.warn("POST ROLLBACK FAILED: ", e);
179            XAException xae = newXAException("POST ROLLBACK FAILED", XAException.XAER_RMERR);
180            xae.initCause(e);
181            throw xae;
182        }
183    }
184
185    @Override
186    public int prepare() throws XAException, IOException {
187        if (LOG.isDebugEnabled()) {
188            LOG.debug("XA Transaction prepare: " + xid);
189        }
190
191        switch (getState()) {
192        case START_STATE:
193            // No work done.. no commit/rollback needed.
194            setStateFinished();
195            return XAResource.XA_RDONLY;
196        case IN_USE_STATE:
197            // We would record prepare here.
198            doPrePrepare();
199            setState(Transaction.PREPARED_STATE);
200            transactionStore.prepare(getTransactionId());
201            preCommitTask.run();
202            return XAResource.XA_OK;
203        default:
204            illegalStateTransition("prepare");
205            return XAResource.XA_RDONLY;
206        }
207    }
208
209    private void setStateFinished() {
210        setState(Transaction.FINISHED_STATE);
211        broker.removeTransaction(xid);
212    }
213
214    public ConnectionId getConnectionId() {
215        return connectionId;
216    }
217
218    @Override
219    public TransactionId getTransactionId() {
220        return xid;
221    }
222    
223    @Override
224    public Logger getLog() {
225        return LOG;
226    }
227
228    public XATransactionId getXid() {
229        return xid;
230    }
231}