001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.virtual;
018
019import java.util.regex.Matcher;
020import java.util.regex.Pattern;
021
022import org.apache.activemq.broker.Broker;
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.broker.region.Destination;
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.ActiveMQQueue;
027import org.apache.activemq.command.ActiveMQTopic;
028import org.apache.activemq.filter.DestinationFilter;
029
030/**
031 * Creates <a href="http://activemq.org/site/virtual-destinations.html">Virtual
032 * Topics</a> using a prefix and postfix. The virtual destination creates a
033 * wildcard that is then used to look up all active queue subscriptions which
034 * match.
035 *
036 * @org.apache.xbean.XBean
037 */
038public class VirtualTopic implements VirtualDestination {
039
040    private String prefix = "Consumer.*.";
041    private String postfix = "";
042    private String name = ">";
043    private boolean selectorAware = false;
044    private boolean local = false;
045    private boolean concurrentSend = false;
046    private boolean transactedSend = false;
047    private boolean dropOnResourceLimit = false;
048
049    @Override
050    public ActiveMQDestination getVirtualDestination() {
051        return new ActiveMQTopic(getName());
052    }
053
054    @Override
055    public Destination intercept(Destination destination) {
056        return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, this) :
057                new VirtualTopicInterceptor(destination, this);
058    }
059
060    @Override
061    public ActiveMQDestination getMappedDestinations() {
062        return new ActiveMQQueue(prefix + name + postfix);
063    }
064
065    @Override
066    public Destination interceptMappedDestination(Destination destination) {
067        // do a reverse map from destination to get actual virtual destination
068        final String physicalName = destination.getActiveMQDestination().getPhysicalName();
069        final Pattern pattern = Pattern.compile(getRegex(prefix) + "(.*)" + getRegex(postfix));
070        final Matcher matcher = pattern.matcher(physicalName);
071        if (matcher.matches()) {
072            final String virtualName = matcher.group(1);
073            return new MappedQueueFilter(new ActiveMQTopic(virtualName), destination);
074        }
075        return destination;
076    }
077
078    private String getRegex(String part) {
079        StringBuilder builder = new StringBuilder();
080        for (char c : part.toCharArray()) {
081            switch (c) {
082                case '.':
083                    builder.append("\\.");
084                    break;
085                case '*':
086                    builder.append("[^\\.]*");
087                    break;
088                default:
089                    builder.append(c);
090            }
091        }
092        return builder.toString();
093    }
094
095    @Override
096    public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
097        if (destination.isQueue() && destination.isPattern()) {
098            DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix + DestinationFilter.ANY_DESCENDENT));
099            if (filter.matches(destination)) {
100                broker.addDestination(context, destination, false);
101
102            }
103        }
104    }
105
106    @Override
107    public void remove(Destination destination) {
108    }
109
110    // Properties
111    // -------------------------------------------------------------------------
112
113    public String getPostfix() {
114        return postfix;
115    }
116
117    /**
118     * Sets any postix used to identify the queue consumers
119     */
120    public void setPostfix(String postfix) {
121        this.postfix = postfix;
122    }
123
124    public String getPrefix() {
125        return prefix;
126    }
127
128    /**
129     * Sets the prefix wildcard used to identify the queue consumers for a given
130     * topic
131     */
132    public void setPrefix(String prefix) {
133        this.prefix = prefix;
134    }
135
136    public String getName() {
137        return name;
138    }
139
140    public void setName(String name) {
141        this.name = name;
142    }
143
144    /**
145     * Indicates whether the selectors of consumers are used to determine
146     * dispatch to a virtual destination, when true only messages matching an
147     * existing consumer will be dispatched.
148     *
149     * @param selectorAware
150     *            when true take consumer selectors into consideration
151     */
152    public void setSelectorAware(boolean selectorAware) {
153        this.selectorAware = selectorAware;
154    }
155
156    public boolean isSelectorAware() {
157        return selectorAware;
158    }
159
160    public boolean isLocal() {
161        return local;
162    }
163
164    public void setLocal(boolean local) {
165        this.local = local;
166    }
167
168    @Override
169    public String toString() {
170        return new StringBuilder("VirtualTopic:").append(prefix).append(',').append(name).append(',').
171                                                  append(postfix).append(',').append(selectorAware).
172                                                  append(',').append(local).toString();
173    }
174
175    public boolean isConcurrentSend() {
176        return concurrentSend;
177    }
178
179    /**
180     * When true, dispatch to matching destinations in parallel (in multiple threads)
181     * @param concurrentSend
182     */
183    public void setConcurrentSend(boolean concurrentSend) {
184        this.concurrentSend = concurrentSend;
185    }
186
187    public boolean isTransactedSend() {
188        return transactedSend;
189    }
190
191    /**
192     * When true, dispatch to matching destinations always uses a transaction.
193     * @param transactedSend
194     */
195    public void setTransactedSend(boolean transactedSend) {
196        this.transactedSend = transactedSend;
197    }
198
199    @Override
200    public int hashCode() {
201        final int prime = 31;
202        int result = 1;
203        result = prime * result + (concurrentSend ? 1231 : 1237);
204        result = prime * result + (local ? 1231 : 1237);
205        result = prime * result + ((name == null) ? 0 : name.hashCode());
206        result = prime * result + ((postfix == null) ? 0 : postfix.hashCode());
207        result = prime * result + ((prefix == null) ? 0 : prefix.hashCode());
208        result = prime * result + (selectorAware ? 1231 : 1237);
209        result = prime * result + (transactedSend ? 1231 : 1237);
210        return result;
211    }
212
213    @Override
214    public boolean equals(Object obj) {
215        if (this == obj)
216            return true;
217        if (obj == null)
218            return false;
219        if (getClass() != obj.getClass())
220            return false;
221        VirtualTopic other = (VirtualTopic) obj;
222        if (concurrentSend != other.concurrentSend)
223            return false;
224        if (local != other.local)
225            return false;
226        if (name == null) {
227            if (other.name != null)
228                return false;
229        } else if (!name.equals(other.name))
230            return false;
231        if (postfix == null) {
232            if (other.postfix != null)
233                return false;
234        } else if (!postfix.equals(other.postfix))
235            return false;
236        if (prefix == null) {
237            if (other.prefix != null)
238                return false;
239        } else if (!prefix.equals(other.prefix))
240            return false;
241        if (selectorAware != other.selectorAware)
242            return false;
243        if (transactedSend != other.transactedSend)
244            return false;
245        return true;
246    }
247
248    public boolean isDropOnResourceLimit() {
249        return dropOnResourceLimit;
250    }
251
252    public void setDropOnResourceLimit(boolean dropOnResourceLimit) {
253        this.dropOnResourceLimit = dropOnResourceLimit;
254    }
255}