001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.virtual;
018
019import java.io.IOException;
020import java.util.Set;
021import java.util.concurrent.CountDownLatch;
022import java.util.concurrent.atomic.AtomicReference;
023import org.apache.activemq.broker.Broker;
024import org.apache.activemq.broker.BrokerService;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.ProducerBrokerExchange;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.DestinationFilter;
029import org.apache.activemq.broker.region.Topic;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.apache.activemq.command.ActiveMQQueue;
032import org.apache.activemq.command.ConnectionId;
033import org.apache.activemq.command.LocalTransactionId;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.util.LRUCache;
036
037import javax.jms.ResourceAllocationException;
038
039/**
040 * A Destination which implements <a href="http://activemq.org/site/virtual-destinations.html">Virtual Topic</a>
041 */
042public class VirtualTopicInterceptor extends DestinationFilter {
043
044    private final String prefix;
045    private final String postfix;
046    private final boolean local;
047    private final boolean concurrentSend;
048    private final boolean transactedSend;
049    private final boolean dropMessageOnResourceLimit;
050
051    private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>();
052
053    public VirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
054        super(next);
055        this.prefix = virtualTopic.getPrefix();
056        this.postfix = virtualTopic.getPostfix();
057        this.local = virtualTopic.isLocal();
058        this.concurrentSend = virtualTopic.isConcurrentSend();
059        this.transactedSend = virtualTopic.isTransactedSend();
060        this.dropMessageOnResourceLimit = virtualTopic.isDropOnResourceLimit();
061    }
062
063    public Topic getTopic() {
064        return (Topic) this.next;
065    }
066
067    @Override
068    public void send(ProducerBrokerExchange context, Message message) throws Exception {
069        if (!message.isAdvisory() && !(local && message.getBrokerPath() != null)) {
070            ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination());
071            send(context, message, queueConsumers);
072        }
073        super.send(context, message);
074    }
075
076    @Override
077    protected void send(final ProducerBrokerExchange context, final Message message, ActiveMQDestination destination) throws Exception {
078        final Broker broker = context.getConnectionContext().getBroker();
079        final Set<Destination> destinations = broker.getDestinations(destination);
080        final int numDestinations = destinations.size();
081
082        final LocalTransactionId localBrokerTransactionToCoalesceJournalSync =
083                beginLocalTransaction(numDestinations, context.getConnectionContext(), message);
084        try {
085            if (concurrentSend && numDestinations > 1) {
086
087                final CountDownLatch concurrent = new CountDownLatch(destinations.size());
088                final AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<Exception>();
089                final BrokerService brokerService = broker.getBrokerService();
090
091                for (final Destination dest : destinations) {
092                    if (shouldDispatch(broker, message, dest)) {
093                        brokerService.getTaskRunnerFactory().execute(new Runnable() {
094                            @Override
095                            public void run() {
096                                try {
097                                    if (exceptionAtomicReference.get() == null) {
098                                        dest.send(context, copy(message, dest.getActiveMQDestination()));
099                                    }
100                                } catch (ResourceAllocationException e) {
101                                    if (!dropMessageOnResourceLimit) {
102                                        exceptionAtomicReference.set(e);
103                                    }
104                                } catch (Exception e) {
105                                    exceptionAtomicReference.set(e);
106                                } finally {
107                                    concurrent.countDown();
108                                }
109                            }
110                        });
111                    } else {
112                        concurrent.countDown();
113                    }
114                }
115                concurrent.await();
116                if (exceptionAtomicReference.get() != null) {
117                    throw exceptionAtomicReference.get();
118                }
119
120            } else {
121                for (final Destination dest : destinations) {
122                    if (shouldDispatch(broker, message, dest)) {
123                        try {
124                            dest.send(context, copy(message, dest.getActiveMQDestination()));
125                        } catch (ResourceAllocationException e) {
126                            if (!dropMessageOnResourceLimit) {
127                                throw e;
128                            }
129                        }
130                    }
131                }
132            }
133        } finally {
134            commit(localBrokerTransactionToCoalesceJournalSync, context.getConnectionContext(), message);
135        }
136    }
137
138    private Message copy(Message original, ActiveMQDestination target) {
139        Message msg = original.copy();
140        msg.setDestination(target);
141        msg.setOriginalDestination(original.getDestination());
142        return msg;
143    }
144
145    private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception {
146        LocalTransactionId result = null;
147        if (transactedSend && numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) {
148            result = new LocalTransactionId(new ConnectionId(message.getMessageId().getProducerId().toString()), message.getMessageId().getProducerSequenceId());
149            connectionContext.getBroker().beginTransaction(connectionContext, result);
150            connectionContext.setTransaction(connectionContext.getTransactions().get(result));
151            message.setTransactionId(result);
152        }
153        return result;
154    }
155
156    private void commit(LocalTransactionId tx, ConnectionContext connectionContext, Message message) throws Exception {
157        if (tx != null) {
158            connectionContext.getBroker().commitTransaction(connectionContext, tx, true);
159            connectionContext.getTransactions().remove(tx);
160            connectionContext.setTransaction(null);
161            message.setTransactionId(null);
162        }
163    }
164
165    protected boolean shouldDispatch(Broker broker, Message message, Destination dest) throws IOException {
166        return true;
167    }
168
169    protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {
170        ActiveMQQueue queue;
171        synchronized (cache) {
172            queue = cache.get(original);
173            if (queue == null) {
174                queue = new ActiveMQQueue(prefix + original.getPhysicalName() + postfix);
175                cache.put(original, queue);
176            }
177        }
178        return queue;
179    }
180}