001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.virtual;
018
019import java.util.Set;
020
021import org.apache.activemq.broker.ConnectionContext;
022import org.apache.activemq.broker.region.BaseDestination;
023import org.apache.activemq.broker.region.Destination;
024import org.apache.activemq.broker.region.DestinationFilter;
025import org.apache.activemq.broker.region.IndirectMessageReference;
026import org.apache.activemq.broker.region.RegionBroker;
027import org.apache.activemq.broker.region.Subscription;
028import org.apache.activemq.broker.region.Topic;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.Message;
031import org.apache.activemq.util.SubscriptionKey;
032
033/**
034 * Creates a mapped Queue that can recover messages from subscription recovery
035 * policy of its Virtual Topic.
036 */
037public class MappedQueueFilter extends DestinationFilter {
038
039    private final ActiveMQDestination virtualDestination;
040
041    public MappedQueueFilter(ActiveMQDestination virtualDestination, Destination destination) {
042        super(destination);
043        this.virtualDestination = virtualDestination;
044    }
045
046    @Override
047    public synchronized void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
048        // recover messages for first consumer only
049        boolean noSubs = getConsumers().isEmpty();
050
051        // for virtual consumer wildcard dests, only subscribe to exact match or non wildcard dests to ensure no duplicates
052        int match = sub.getActiveMQDestination().compareTo(next.getActiveMQDestination());
053        if (match == 0 || (!next.getActiveMQDestination().isPattern() && match == 1)) {
054            super.addSubscription(context, sub);
055        }
056        if (noSubs && !getConsumers().isEmpty()) {
057            // new subscription added, recover retroactive messages
058            final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class);
059            final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
060
061            final ActiveMQDestination newDestination = sub.getActiveMQDestination();
062            BaseDestination regionDest = null;
063
064            for (Destination virtualDest : virtualDests) {
065                if (virtualDest.getActiveMQDestination().isTopic() &&
066                        (virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) {
067
068                    Topic topic = (Topic) getBaseDestination(virtualDest);
069                    if (topic != null) {
070                        // re-use browse() to get recovered messages
071                        final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination());
072
073                        // add recovered messages to subscription
074                        for (Message message : messages) {
075                            final Message copy = message.copy();
076                            copy.setOriginalDestination(message.getDestination());
077                            copy.setDestination(newDestination);
078                            if (regionDest == null) {
079                                regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
080                            }
081                            copy.setRegionDestination(regionDest);
082                            sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy);
083                        }
084                    }
085                }
086            }
087        }
088    }
089
090    private BaseDestination getBaseDestination(Destination virtualDest) {
091        if (virtualDest instanceof BaseDestination) {
092            return (BaseDestination) virtualDest;
093        } else if (virtualDest instanceof DestinationFilter) {
094            return ((DestinationFilter) virtualDest).getAdaptor(BaseDestination.class);
095        }
096        return null;
097    }
098
099    @Override
100    public synchronized void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
101        super.removeSubscription(context, sub, lastDeliveredSequenceId);
102    }
103
104    @Override
105    public synchronized void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
106        super.deleteSubscription(context, key);
107    }
108
109    @Override
110    public String toString() {
111        return "MappedQueueFilter[" + virtualDestination + ", " + next + "]";
112    }
113}