001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.LinkedHashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026
027import javax.jms.JMSException;
028import javax.transaction.xa.XAException;
029
030import org.apache.activemq.broker.jmx.ManagedRegionBroker;
031import org.apache.activemq.broker.region.Destination;
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.BaseCommand;
034import org.apache.activemq.command.ConnectionInfo;
035import org.apache.activemq.command.LocalTransactionId;
036import org.apache.activemq.command.Message;
037import org.apache.activemq.command.MessageAck;
038import org.apache.activemq.command.ProducerInfo;
039import org.apache.activemq.command.TransactionId;
040import org.apache.activemq.command.XATransactionId;
041import org.apache.activemq.state.ProducerState;
042import org.apache.activemq.store.TransactionRecoveryListener;
043import org.apache.activemq.store.TransactionStore;
044import org.apache.activemq.transaction.LocalTransaction;
045import org.apache.activemq.transaction.Synchronization;
046import org.apache.activemq.transaction.Transaction;
047import org.apache.activemq.transaction.XATransaction;
048import org.apache.activemq.util.IOExceptionSupport;
049import org.apache.activemq.util.WrappedException;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * This broker filter handles the transaction related operations in the Broker
055 * interface.
056 * 
057 * 
058 */
059public class TransactionBroker extends BrokerFilter {
060
061    private static final Logger LOG = LoggerFactory.getLogger(TransactionBroker.class);
062
063    // The prepared XA transactions.
064    private TransactionStore transactionStore;
065    private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>();
066    final ConnectionContext context = new ConnectionContext();
067
068    public TransactionBroker(Broker next, TransactionStore transactionStore) {
069        super(next);
070        this.transactionStore = transactionStore;
071    }
072
073    // ////////////////////////////////////////////////////////////////////////////
074    //
075    // Life cycle Methods
076    //
077    // ////////////////////////////////////////////////////////////////////////////
078
079    /**
080     * Recovers any prepared transactions.
081     */
082    public void start() throws Exception {
083        transactionStore.start();
084        try {
085            context.setBroker(this);
086            context.setInRecoveryMode(true);
087            context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
088            context.setProducerFlowControl(false);
089            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
090            producerExchange.setMutable(true);
091            producerExchange.setConnectionContext(context);
092            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
093            final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
094            consumerExchange.setConnectionContext(context);
095            transactionStore.recover(new TransactionRecoveryListener() {
096                public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
097                    try {
098                        beginTransaction(context, xid);
099                        XATransaction transaction = (XATransaction) getTransaction(context, xid, false);
100                        for (int i = 0; i < addedMessages.length; i++) {
101                            forceDestinationWakeupOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]);
102                        }
103                        for (int i = 0; i < aks.length; i++) {
104                            forceDestinationWakeupOnCompletion(context, transaction, aks[i].getDestination(), aks[i]);
105                        }
106                        transaction.setState(Transaction.PREPARED_STATE);
107                        registerMBean(transaction);
108                        LOG.debug("recovered prepared transaction: {}", transaction.getTransactionId());
109                    } catch (Throwable e) {
110                        throw new WrappedException(e);
111                    }
112                }
113            });
114        } catch (WrappedException e) {
115            Throwable cause = e.getCause();
116            throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause);
117        }
118        next.start();
119    }
120
121    private void registerMBean(XATransaction transaction) {
122        if (getBrokerService().getRegionBroker() instanceof ManagedRegionBroker ) {
123            ManagedRegionBroker managedRegionBroker = (ManagedRegionBroker) getBrokerService().getRegionBroker();
124            managedRegionBroker.registerRecoveredTransactionMBean(transaction);
125        }
126    }
127
128    private void forceDestinationWakeupOnCompletion(ConnectionContext context, Transaction transaction,
129                                                    ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
130        registerSync(amqDestination, transaction, ack);
131    }
132
133    private void registerSync(ActiveMQDestination destination, Transaction transaction, BaseCommand command) {
134        Synchronization sync = new PreparedDestinationCompletion(this, destination, command.isMessage());
135        // ensure one per destination in the list
136        Synchronization existing = transaction.findMatching(sync);
137        if (existing != null) {
138           ((PreparedDestinationCompletion)existing).incrementOpCount();
139        } else {
140            transaction.addSynchronization(sync);
141        }
142    }
143
144    static class PreparedDestinationCompletion extends Synchronization {
145        private final TransactionBroker transactionBroker;
146        final ActiveMQDestination destination;
147        final boolean messageSend;
148        int opCount = 1;
149        public PreparedDestinationCompletion(final TransactionBroker transactionBroker, ActiveMQDestination destination, boolean messageSend) {
150            this.transactionBroker = transactionBroker;
151            this.destination = destination;
152            // rollback relevant to acks, commit to sends
153            this.messageSend = messageSend;
154        }
155
156        public void incrementOpCount() {
157            opCount++;
158        }
159
160        @Override
161        public int hashCode() {
162            return System.identityHashCode(destination) +
163                    System.identityHashCode(Boolean.valueOf(messageSend));
164        }
165
166        @Override
167        public boolean equals(Object other) {
168            return other instanceof PreparedDestinationCompletion &&
169                    destination.equals(((PreparedDestinationCompletion) other).destination) &&
170                    messageSend == ((PreparedDestinationCompletion) other).messageSend;
171        }
172
173        @Override
174        public void afterRollback() throws Exception {
175            if (!messageSend) {
176                Destination dest = transactionBroker.addDestination(transactionBroker.context, destination, false);
177                dest.clearPendingMessages(opCount);
178                dest.getDestinationStatistics().getMessages().add(opCount);
179                LOG.debug("cleared pending from afterRollback: {}", destination);
180            }
181        }
182
183        @Override
184        public void afterCommit() throws Exception {
185            Destination dest = transactionBroker.addDestination(transactionBroker.context, destination, false);
186            if (messageSend) {
187                dest.clearPendingMessages(opCount);
188                dest.getDestinationStatistics().getEnqueues().add(opCount);
189                dest.getDestinationStatistics().getMessages().add(opCount);
190                LOG.debug("cleared pending from afterCommit: {}", destination);
191            } else {
192                dest.getDestinationStatistics().getDequeues().add(opCount);
193            }
194        }
195    }
196
197    public void stop() throws Exception {
198        transactionStore.stop();
199        next.stop();
200    }
201
202    // ////////////////////////////////////////////////////////////////////////////
203    //
204    // BrokerFilter overrides
205    //
206    // ////////////////////////////////////////////////////////////////////////////
207    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
208        List<TransactionId> txs = new ArrayList<TransactionId>();
209        synchronized (xaTransactions) {
210            for (Iterator<XATransaction> iter = xaTransactions.values().iterator(); iter.hasNext();) {
211                Transaction tx = iter.next();
212                if (tx.isPrepared()) {
213                    LOG.debug("prepared transaction: {}", tx.getTransactionId());
214                    txs.add(tx.getTransactionId());
215                }
216            }
217        }
218        XATransactionId rc[] = new XATransactionId[txs.size()];
219        txs.toArray(rc);
220        LOG.debug("prepared transaction list size: {}", rc.length);
221        return rc;
222    }
223
224    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
225        // the transaction may have already been started.
226        if (xid.isXATransaction()) {
227            XATransaction transaction = null;
228            synchronized (xaTransactions) {
229                transaction = xaTransactions.get(xid);
230                if (transaction != null) {
231                    return;
232                }
233                transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId());
234                xaTransactions.put(xid, transaction);
235            }
236        } else {
237            Map<TransactionId, Transaction> transactionMap = context.getTransactions();
238            Transaction transaction = transactionMap.get(xid);
239            if (transaction != null) {
240                throw new JMSException("Transaction '" + xid + "' has already been started.");
241            }
242            transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context);
243            transactionMap.put(xid, transaction);
244        }
245    }
246
247    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
248        Transaction transaction = getTransaction(context, xid, false);
249        return transaction.prepare();
250    }
251
252    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
253        Transaction transaction = getTransaction(context, xid, true);
254        transaction.commit(onePhase);
255    }
256
257    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
258        Transaction transaction = getTransaction(context, xid, true);
259        transaction.rollback();
260    }
261
262    public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
263        Transaction transaction = getTransaction(context, xid, true);
264        transaction.rollback();
265    }
266
267    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
268        // This method may be invoked recursively.
269        // Track original tx so that it can be restored.
270        final ConnectionContext context = consumerExchange.getConnectionContext();
271        Transaction originalTx = context.getTransaction();
272        Transaction transaction = null;
273        if (ack.isInTransaction()) {
274            transaction = getTransaction(context, ack.getTransactionId(), false);
275        }
276        context.setTransaction(transaction);
277        try {
278            next.acknowledge(consumerExchange, ack);
279        } finally {
280            context.setTransaction(originalTx);
281        }
282    }
283
284    public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception {
285        // This method may be invoked recursively.
286        // Track original tx so that it can be restored.
287        final ConnectionContext context = producerExchange.getConnectionContext();
288        Transaction originalTx = context.getTransaction();
289        Transaction transaction = null;
290        if (message.getTransactionId() != null) {
291            transaction = getTransaction(context, message.getTransactionId(), false);
292        }
293        context.setTransaction(transaction);
294        try {
295            next.send(producerExchange, message);
296        } finally {
297            context.setTransaction(originalTx);
298        }
299    }
300
301    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
302        for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
303            try {
304                Transaction transaction = iter.next();
305                transaction.rollback();
306            } catch (Exception e) {
307                LOG.warn("ERROR Rolling back disconnected client's transactions: ", e);
308            }
309            iter.remove();
310        }
311
312        synchronized (xaTransactions) {
313            // first find all txs that belongs to the connection
314            ArrayList<XATransaction> txs = new ArrayList<XATransaction>();
315            for (XATransaction tx : xaTransactions.values()) {
316                if (tx.getConnectionId() != null && tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
317                    txs.add(tx);
318                }
319            }
320
321            // then remove them
322            // two steps needed to avoid ConcurrentModificationException, from removeTransaction()
323            for (XATransaction tx : txs) {
324                try {
325                    tx.rollback();
326                } catch (Exception e) {
327                    LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e);
328                }
329            }
330
331        }
332        next.removeConnection(context, info, error);
333    }
334
335    // ////////////////////////////////////////////////////////////////////////////
336    //
337    // Implementation help methods.
338    //
339    // ////////////////////////////////////////////////////////////////////////////
340    public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException {
341        Transaction transaction = null;
342        if (xid.isXATransaction()) {
343            synchronized (xaTransactions) {
344                transaction = xaTransactions.get(xid);
345            }
346        } else {
347            transaction = context.getTransactions().get(xid);
348        }
349        if (transaction != null) {
350            return transaction;
351        }
352        if (xid.isXATransaction()) {
353            XAException e = XATransaction.newXAException("Transaction '" + xid + "' has not been started.", XAException.XAER_NOTA);
354            throw e;
355        } else {
356            throw new JMSException("Transaction '" + xid + "' has not been started.");
357        }
358    }
359
360    public void removeTransaction(XATransactionId xid) {
361        synchronized (xaTransactions) {
362            xaTransactions.remove(xid);
363        }
364    }
365
366}