001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.LinkedHashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.ConcurrentHashMap; 026 027import javax.jms.JMSException; 028import javax.transaction.xa.XAException; 029 030import org.apache.activemq.broker.jmx.ManagedRegionBroker; 031import org.apache.activemq.broker.region.Destination; 032import org.apache.activemq.command.ActiveMQDestination; 033import org.apache.activemq.command.BaseCommand; 034import org.apache.activemq.command.ConnectionInfo; 035import org.apache.activemq.command.LocalTransactionId; 036import org.apache.activemq.command.Message; 037import org.apache.activemq.command.MessageAck; 038import org.apache.activemq.command.ProducerInfo; 039import org.apache.activemq.command.TransactionId; 040import org.apache.activemq.command.XATransactionId; 041import org.apache.activemq.state.ProducerState; 042import org.apache.activemq.store.TransactionRecoveryListener; 043import org.apache.activemq.store.TransactionStore; 044import org.apache.activemq.transaction.LocalTransaction; 045import org.apache.activemq.transaction.Synchronization; 046import org.apache.activemq.transaction.Transaction; 047import org.apache.activemq.transaction.XATransaction; 048import org.apache.activemq.util.IOExceptionSupport; 049import org.apache.activemq.util.WrappedException; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * This broker filter handles the transaction related operations in the Broker 055 * interface. 056 * 057 * 058 */ 059public class TransactionBroker extends BrokerFilter { 060 061 private static final Logger LOG = LoggerFactory.getLogger(TransactionBroker.class); 062 063 // The prepared XA transactions. 064 private TransactionStore transactionStore; 065 private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>(); 066 final ConnectionContext context = new ConnectionContext(); 067 068 public TransactionBroker(Broker next, TransactionStore transactionStore) { 069 super(next); 070 this.transactionStore = transactionStore; 071 } 072 073 // //////////////////////////////////////////////////////////////////////////// 074 // 075 // Life cycle Methods 076 // 077 // //////////////////////////////////////////////////////////////////////////// 078 079 /** 080 * Recovers any prepared transactions. 081 */ 082 public void start() throws Exception { 083 transactionStore.start(); 084 try { 085 context.setBroker(this); 086 context.setInRecoveryMode(true); 087 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 088 context.setProducerFlowControl(false); 089 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 090 producerExchange.setMutable(true); 091 producerExchange.setConnectionContext(context); 092 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 093 final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange(); 094 consumerExchange.setConnectionContext(context); 095 transactionStore.recover(new TransactionRecoveryListener() { 096 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) { 097 try { 098 beginTransaction(context, xid); 099 XATransaction transaction = (XATransaction) getTransaction(context, xid, false); 100 for (int i = 0; i < addedMessages.length; i++) { 101 forceDestinationWakeupOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]); 102 } 103 for (int i = 0; i < aks.length; i++) { 104 forceDestinationWakeupOnCompletion(context, transaction, aks[i].getDestination(), aks[i]); 105 } 106 transaction.setState(Transaction.PREPARED_STATE); 107 registerMBean(transaction); 108 LOG.debug("recovered prepared transaction: {}", transaction.getTransactionId()); 109 } catch (Throwable e) { 110 throw new WrappedException(e); 111 } 112 } 113 }); 114 } catch (WrappedException e) { 115 Throwable cause = e.getCause(); 116 throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause); 117 } 118 next.start(); 119 } 120 121 private void registerMBean(XATransaction transaction) { 122 if (getBrokerService().getRegionBroker() instanceof ManagedRegionBroker ) { 123 ManagedRegionBroker managedRegionBroker = (ManagedRegionBroker) getBrokerService().getRegionBroker(); 124 managedRegionBroker.registerRecoveredTransactionMBean(transaction); 125 } 126 } 127 128 private void forceDestinationWakeupOnCompletion(ConnectionContext context, Transaction transaction, 129 ActiveMQDestination amqDestination, BaseCommand ack) throws Exception { 130 registerSync(amqDestination, transaction, ack); 131 } 132 133 private void registerSync(ActiveMQDestination destination, Transaction transaction, BaseCommand command) { 134 Synchronization sync = new PreparedDestinationCompletion(this, destination, command.isMessage()); 135 // ensure one per destination in the list 136 Synchronization existing = transaction.findMatching(sync); 137 if (existing != null) { 138 ((PreparedDestinationCompletion)existing).incrementOpCount(); 139 } else { 140 transaction.addSynchronization(sync); 141 } 142 } 143 144 static class PreparedDestinationCompletion extends Synchronization { 145 private final TransactionBroker transactionBroker; 146 final ActiveMQDestination destination; 147 final boolean messageSend; 148 int opCount = 1; 149 public PreparedDestinationCompletion(final TransactionBroker transactionBroker, ActiveMQDestination destination, boolean messageSend) { 150 this.transactionBroker = transactionBroker; 151 this.destination = destination; 152 // rollback relevant to acks, commit to sends 153 this.messageSend = messageSend; 154 } 155 156 public void incrementOpCount() { 157 opCount++; 158 } 159 160 @Override 161 public int hashCode() { 162 return System.identityHashCode(destination) + 163 System.identityHashCode(Boolean.valueOf(messageSend)); 164 } 165 166 @Override 167 public boolean equals(Object other) { 168 return other instanceof PreparedDestinationCompletion && 169 destination.equals(((PreparedDestinationCompletion) other).destination) && 170 messageSend == ((PreparedDestinationCompletion) other).messageSend; 171 } 172 173 @Override 174 public void afterRollback() throws Exception { 175 if (!messageSend) { 176 Destination dest = transactionBroker.addDestination(transactionBroker.context, destination, false); 177 dest.clearPendingMessages(opCount); 178 dest.getDestinationStatistics().getMessages().add(opCount); 179 LOG.debug("cleared pending from afterRollback: {}", destination); 180 } 181 } 182 183 @Override 184 public void afterCommit() throws Exception { 185 Destination dest = transactionBroker.addDestination(transactionBroker.context, destination, false); 186 if (messageSend) { 187 dest.clearPendingMessages(opCount); 188 dest.getDestinationStatistics().getEnqueues().add(opCount); 189 dest.getDestinationStatistics().getMessages().add(opCount); 190 LOG.debug("cleared pending from afterCommit: {}", destination); 191 } else { 192 dest.getDestinationStatistics().getDequeues().add(opCount); 193 } 194 } 195 } 196 197 public void stop() throws Exception { 198 transactionStore.stop(); 199 next.stop(); 200 } 201 202 // //////////////////////////////////////////////////////////////////////////// 203 // 204 // BrokerFilter overrides 205 // 206 // //////////////////////////////////////////////////////////////////////////// 207 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 208 List<TransactionId> txs = new ArrayList<TransactionId>(); 209 synchronized (xaTransactions) { 210 for (Iterator<XATransaction> iter = xaTransactions.values().iterator(); iter.hasNext();) { 211 Transaction tx = iter.next(); 212 if (tx.isPrepared()) { 213 LOG.debug("prepared transaction: {}", tx.getTransactionId()); 214 txs.add(tx.getTransactionId()); 215 } 216 } 217 } 218 XATransactionId rc[] = new XATransactionId[txs.size()]; 219 txs.toArray(rc); 220 LOG.debug("prepared transaction list size: {}", rc.length); 221 return rc; 222 } 223 224 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 225 // the transaction may have already been started. 226 if (xid.isXATransaction()) { 227 XATransaction transaction = null; 228 synchronized (xaTransactions) { 229 transaction = xaTransactions.get(xid); 230 if (transaction != null) { 231 return; 232 } 233 transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId()); 234 xaTransactions.put(xid, transaction); 235 } 236 } else { 237 Map<TransactionId, Transaction> transactionMap = context.getTransactions(); 238 Transaction transaction = transactionMap.get(xid); 239 if (transaction != null) { 240 throw new JMSException("Transaction '" + xid + "' has already been started."); 241 } 242 transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context); 243 transactionMap.put(xid, transaction); 244 } 245 } 246 247 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 248 Transaction transaction = getTransaction(context, xid, false); 249 return transaction.prepare(); 250 } 251 252 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 253 Transaction transaction = getTransaction(context, xid, true); 254 transaction.commit(onePhase); 255 } 256 257 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 258 Transaction transaction = getTransaction(context, xid, true); 259 transaction.rollback(); 260 } 261 262 public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception { 263 Transaction transaction = getTransaction(context, xid, true); 264 transaction.rollback(); 265 } 266 267 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 268 // This method may be invoked recursively. 269 // Track original tx so that it can be restored. 270 final ConnectionContext context = consumerExchange.getConnectionContext(); 271 Transaction originalTx = context.getTransaction(); 272 Transaction transaction = null; 273 if (ack.isInTransaction()) { 274 transaction = getTransaction(context, ack.getTransactionId(), false); 275 } 276 context.setTransaction(transaction); 277 try { 278 next.acknowledge(consumerExchange, ack); 279 } finally { 280 context.setTransaction(originalTx); 281 } 282 } 283 284 public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception { 285 // This method may be invoked recursively. 286 // Track original tx so that it can be restored. 287 final ConnectionContext context = producerExchange.getConnectionContext(); 288 Transaction originalTx = context.getTransaction(); 289 Transaction transaction = null; 290 if (message.getTransactionId() != null) { 291 transaction = getTransaction(context, message.getTransactionId(), false); 292 } 293 context.setTransaction(transaction); 294 try { 295 next.send(producerExchange, message); 296 } finally { 297 context.setTransaction(originalTx); 298 } 299 } 300 301 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 302 for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) { 303 try { 304 Transaction transaction = iter.next(); 305 transaction.rollback(); 306 } catch (Exception e) { 307 LOG.warn("ERROR Rolling back disconnected client's transactions: ", e); 308 } 309 iter.remove(); 310 } 311 312 synchronized (xaTransactions) { 313 // first find all txs that belongs to the connection 314 ArrayList<XATransaction> txs = new ArrayList<XATransaction>(); 315 for (XATransaction tx : xaTransactions.values()) { 316 if (tx.getConnectionId() != null && tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) { 317 txs.add(tx); 318 } 319 } 320 321 // then remove them 322 // two steps needed to avoid ConcurrentModificationException, from removeTransaction() 323 for (XATransaction tx : txs) { 324 try { 325 tx.rollback(); 326 } catch (Exception e) { 327 LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e); 328 } 329 } 330 331 } 332 next.removeConnection(context, info, error); 333 } 334 335 // //////////////////////////////////////////////////////////////////////////// 336 // 337 // Implementation help methods. 338 // 339 // //////////////////////////////////////////////////////////////////////////// 340 public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException { 341 Transaction transaction = null; 342 if (xid.isXATransaction()) { 343 synchronized (xaTransactions) { 344 transaction = xaTransactions.get(xid); 345 } 346 } else { 347 transaction = context.getTransactions().get(xid); 348 } 349 if (transaction != null) { 350 return transaction; 351 } 352 if (xid.isXATransaction()) { 353 XAException e = XATransaction.newXAException("Transaction '" + xid + "' has not been started.", XAException.XAER_NOTA); 354 throw e; 355 } else { 356 throw new JMSException("Transaction '" + xid + "' has not been started."); 357 } 358 } 359 360 public void removeTransaction(XATransactionId xid) { 361 synchronized (xaTransactions) { 362 xaTransactions.remove(xid); 363 } 364 } 365 366}