001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.virtual;
018
019import java.io.IOException;
020import java.util.Set;
021import java.util.concurrent.CountDownLatch;
022import java.util.concurrent.atomic.AtomicReference;
023import org.apache.activemq.broker.Broker;
024import org.apache.activemq.broker.BrokerService;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.ProducerBrokerExchange;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.DestinationFilter;
029import org.apache.activemq.broker.region.Topic;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.apache.activemq.command.ActiveMQQueue;
032import org.apache.activemq.command.ConnectionId;
033import org.apache.activemq.command.LocalTransactionId;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.util.LRUCache;
036
037import javax.jms.ResourceAllocationException;
038
039/**
040 * A Destination which implements <a href="http://activemq.org/site/virtual-destinations.html">Virtual Topic</a>
041 */
042public class VirtualTopicInterceptor extends DestinationFilter {
043
044    private final String prefix;
045    private final String postfix;
046    private final boolean local;
047    private final boolean concurrentSend;
048    private final boolean transactedSend;
049    private final boolean dropMessageOnResourceLimit;
050    private final boolean setOriginalDestination;
051
052    private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>();
053
054    public VirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
055        super(next);
056        this.prefix = virtualTopic.getPrefix();
057        this.postfix = virtualTopic.getPostfix();
058        this.local = virtualTopic.isLocal();
059        this.concurrentSend = virtualTopic.isConcurrentSend();
060        this.transactedSend = virtualTopic.isTransactedSend();
061        this.dropMessageOnResourceLimit = virtualTopic.isDropOnResourceLimit();
062        this.setOriginalDestination = virtualTopic.isSetOriginalDestination();
063    }
064
065    public Topic getTopic() {
066        return (Topic) this.next;
067    }
068
069    @Override
070    public void send(ProducerBrokerExchange context, Message message) throws Exception {
071        if (!message.isAdvisory() && !(local && message.getBrokerPath() != null)) {
072            ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination());
073            send(context, message, queueConsumers);
074        }
075        super.send(context, message);
076    }
077
078    @Override
079    protected void send(final ProducerBrokerExchange context, final Message message, ActiveMQDestination destination) throws Exception {
080        final Broker broker = context.getConnectionContext().getBroker();
081        final Set<Destination> destinations = broker.getDestinations(destination);
082        final int numDestinations = destinations.size();
083
084        final LocalTransactionId localBrokerTransactionToCoalesceJournalSync =
085                beginLocalTransaction(numDestinations, context.getConnectionContext(), message);
086        try {
087            if (concurrentSend && numDestinations > 1) {
088
089                final CountDownLatch concurrent = new CountDownLatch(destinations.size());
090                final AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<Exception>();
091                final BrokerService brokerService = broker.getBrokerService();
092
093                for (final Destination dest : destinations) {
094                    if (shouldDispatch(broker, message, dest)) {
095                        brokerService.getTaskRunnerFactory().execute(new Runnable() {
096                            @Override
097                            public void run() {
098                                try {
099                                    if (exceptionAtomicReference.get() == null) {
100                                        dest.send(context, copy(message, dest.getActiveMQDestination()));
101                                    }
102                                } catch (ResourceAllocationException e) {
103                                    if (!dropMessageOnResourceLimit) {
104                                        exceptionAtomicReference.set(e);
105                                    }
106                                } catch (Exception e) {
107                                    exceptionAtomicReference.set(e);
108                                } finally {
109                                    concurrent.countDown();
110                                }
111                            }
112                        });
113                    } else {
114                        concurrent.countDown();
115                    }
116                }
117                concurrent.await();
118                if (exceptionAtomicReference.get() != null) {
119                    throw exceptionAtomicReference.get();
120                }
121
122            } else {
123                for (final Destination dest : destinations) {
124                    if (shouldDispatch(broker, message, dest)) {
125                        try {
126                            dest.send(context, copy(message, dest.getActiveMQDestination()));
127                        } catch (ResourceAllocationException e) {
128                            if (!dropMessageOnResourceLimit) {
129                                throw e;
130                            }
131                        }
132                    }
133                }
134            }
135        } finally {
136            commit(localBrokerTransactionToCoalesceJournalSync, context.getConnectionContext(), message);
137        }
138    }
139
140    private Message copy(Message original, ActiveMQDestination target) {
141        Message msg = original.copy();
142        if (setOriginalDestination) {
143            msg.setDestination(target);
144            msg.setOriginalDestination(original.getDestination());
145        }
146        return msg;
147    }
148
149    private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception {
150        LocalTransactionId result = null;
151        if (transactedSend && numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) {
152            result = new LocalTransactionId(new ConnectionId(message.getMessageId().getProducerId().toString()), message.getMessageId().getProducerSequenceId());
153            connectionContext.getBroker().beginTransaction(connectionContext, result);
154            connectionContext.setTransaction(connectionContext.getTransactions().get(result));
155            message.setTransactionId(result);
156        }
157        return result;
158    }
159
160    private void commit(LocalTransactionId tx, ConnectionContext connectionContext, Message message) throws Exception {
161        if (tx != null) {
162            connectionContext.getBroker().commitTransaction(connectionContext, tx, true);
163            connectionContext.getTransactions().remove(tx);
164            connectionContext.setTransaction(null);
165            message.setTransactionId(null);
166        }
167    }
168
169    protected boolean shouldDispatch(Broker broker, Message message, Destination dest) throws IOException {
170        return true;
171    }
172
173    protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {
174        ActiveMQQueue queue;
175        synchronized (cache) {
176            queue = cache.get(original);
177            if (queue == null) {
178                queue = new ActiveMQQueue(prefix + original.getPhysicalName() + postfix);
179                cache.put(original, queue);
180            }
181        }
182        return queue;
183    }
184}