001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transaction; 018 019import java.io.IOException; 020import javax.transaction.xa.XAException; 021import javax.transaction.xa.XAResource; 022import org.apache.activemq.TransactionContext; 023import org.apache.activemq.broker.TransactionBroker; 024import org.apache.activemq.command.ConnectionId; 025import org.apache.activemq.command.TransactionId; 026import org.apache.activemq.command.XATransactionId; 027import org.apache.activemq.store.TransactionStore; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * 033 */ 034public class XATransaction extends Transaction { 035 036 private static final Logger LOG = LoggerFactory.getLogger(XATransaction.class); 037 038 private final TransactionStore transactionStore; 039 private final XATransactionId xid; 040 private final TransactionBroker broker; 041 private final ConnectionId connectionId; 042 043 public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker, ConnectionId connectionId) { 044 this.transactionStore = transactionStore; 045 this.xid = xid; 046 this.broker = broker; 047 this.connectionId = connectionId; 048 if (LOG.isDebugEnabled()) { 049 LOG.debug("XA Transaction new/begin : " + xid); 050 } 051 } 052 053 @Override 054 public void commit(boolean onePhase) throws XAException, IOException { 055 if (LOG.isDebugEnabled()) { 056 LOG.debug("XA Transaction commit onePhase:" + onePhase + ", xid: " + xid); 057 } 058 059 switch (getState()) { 060 case START_STATE: 061 // 1 phase commit, no work done. 062 checkForPreparedState(onePhase); 063 setStateFinished(); 064 break; 065 case IN_USE_STATE: 066 // 1 phase commit, work done. 067 checkForPreparedState(onePhase); 068 doPrePrepare(); 069 setStateFinished(); 070 storeCommit(getTransactionId(), false, preCommitTask, postCommitTask); 071 break; 072 case PREPARED_STATE: 073 // 2 phase commit, work done. 074 // We would record commit here. 075 storeCommit(getTransactionId(), true, preCommitTask, postCommitTask); 076 setStateFinished(); 077 break; 078 default: 079 illegalStateTransition("commit"); 080 } 081 } 082 083 private void storeCommit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) 084 throws XAException, IOException { 085 try { 086 transactionStore.commit(getTransactionId(), wasPrepared, preCommitTask, postCommitTask); 087 waitPostCommitDone(postCommitTask); 088 } catch (XAException xae) { 089 throw xae; 090 } catch (Throwable t) { 091 LOG.warn("Store COMMIT FAILED: " + txid, t); 092 XAException xae = null; 093 if (wasPrepared) { 094 // report and await outcome 095 xae = newXAException("STORE COMMIT FAILED: " + t.getMessage(), XAException.XA_RETRY); 096 // fire rollback syncs to revert 097 doPostRollback(); 098 } else { 099 try { 100 rollback(); 101 xae = newXAException("STORE COMMIT FAILED: Transaction rolled back", XAException.XA_RBCOMMFAIL); 102 } catch (Throwable e) { 103 xae = newXAException("STORE COMMIT FAILED: " + t.getMessage() +". Rolled failed:" + e.getMessage(), XAException.XA_RBINTEGRITY); 104 } 105 } 106 xae.initCause(t); 107 throw xae; 108 } 109 } 110 111 private void illegalStateTransition(String callName) throws XAException { 112 XAException xae = newXAException("Cannot call " + callName + " now.", XAException.XAER_PROTO); 113 throw xae; 114 } 115 116 private void checkForPreparedState(boolean onePhase) throws XAException { 117 if (!onePhase) { 118 XAException xae = newXAException("Cannot do 2 phase commit if the transaction has not been prepared", XAException.XAER_PROTO); 119 throw xae; 120 } 121 } 122 123 private void doPrePrepare() throws XAException, IOException { 124 try { 125 prePrepare(); 126 } catch (XAException e) { 127 throw e; 128 } catch (Throwable e) { 129 LOG.warn("PRE-PREPARE FAILED: ", e); 130 rollback(); 131 XAException xae = newXAException("PRE-PREPARE FAILED: Transaction rolled back", XAException.XA_RBOTHER); 132 xae.initCause(e); 133 throw xae; 134 } 135 } 136 137 @Override 138 public void rollback() throws XAException, IOException { 139 140 if (LOG.isDebugEnabled()) { 141 LOG.debug("XA Transaction rollback: " + xid); 142 } 143 144 switch (getState()) { 145 case START_STATE: 146 // 1 phase rollback no work done. 147 setStateFinished(); 148 break; 149 case IN_USE_STATE: 150 // 1 phase rollback work done. 151 setStateFinished(); 152 transactionStore.rollback(getTransactionId()); 153 doPostRollback(); 154 break; 155 case PREPARED_STATE: 156 // 2 phase rollback work done. 157 setStateFinished(); 158 transactionStore.rollback(getTransactionId()); 159 doPostRollback(); 160 break; 161 case FINISHED_STATE: 162 // failure to commit 163 transactionStore.rollback(getTransactionId()); 164 doPostRollback(); 165 break; 166 default: 167 throw newXAException("Invalid state: " + getState(), XAException.XA_RBPROTO); 168 } 169 170 } 171 172 private void doPostRollback() throws XAException { 173 try { 174 fireAfterRollback(); 175 } catch (Throwable e) { 176 // I guess this could happen. Post commit task failed 177 // to execute properly. 178 LOG.warn("POST ROLLBACK FAILED: ", e); 179 XAException xae = newXAException("POST ROLLBACK FAILED", XAException.XAER_RMERR); 180 xae.initCause(e); 181 throw xae; 182 } 183 } 184 185 @Override 186 public int prepare() throws XAException, IOException { 187 if (LOG.isDebugEnabled()) { 188 LOG.debug("XA Transaction prepare: " + xid); 189 } 190 191 switch (getState()) { 192 case START_STATE: 193 // No work done.. no commit/rollback needed. 194 setStateFinished(); 195 return XAResource.XA_RDONLY; 196 case IN_USE_STATE: 197 // We would record prepare here. 198 doPrePrepare(); 199 setState(Transaction.PREPARED_STATE); 200 transactionStore.prepare(getTransactionId()); 201 return XAResource.XA_OK; 202 default: 203 illegalStateTransition("prepare"); 204 return XAResource.XA_RDONLY; 205 } 206 } 207 208 private void setStateFinished() { 209 setState(Transaction.FINISHED_STATE); 210 broker.removeTransaction(xid); 211 } 212 213 public ConnectionId getConnectionId() { 214 return connectionId; 215 } 216 217 @Override 218 public TransactionId getTransactionId() { 219 return xid; 220 } 221 222 @Override 223 public Logger getLog() { 224 return LOG; 225 } 226 227 public XATransactionId getXid() { 228 return xid; 229 } 230}