001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.policy;
018
019import java.util.ArrayList;
020import java.util.Iterator;
021import java.util.List;
022
023import org.apache.activemq.broker.region.MessageReference;
024import org.apache.activemq.broker.region.Subscription;
025import org.apache.activemq.command.ConsumerId;
026import org.apache.activemq.command.ConsumerInfo;
027import org.apache.activemq.filter.MessageEvaluationContext;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * dispatch policy that ignores lower priority duplicate network consumers,
033 * used in conjunction with network bridge suppresDuplicateTopicSubscriptions
034 * 
035 * @org.apache.xbean.XBean
036 */
037public class PriorityNetworkDispatchPolicy extends SimpleDispatchPolicy {
038
039    private static final Logger LOG = LoggerFactory.getLogger(PriorityNetworkDispatchPolicy.class);
040    @Override
041    public boolean dispatch(MessageReference node,
042            MessageEvaluationContext msgContext,
043            List<Subscription> consumers) throws Exception {
044        
045        List<Subscription> duplicateFreeSubs = new ArrayList<Subscription>();
046        synchronized (consumers) {
047            for (Subscription sub: consumers) {
048                ConsumerInfo info = sub.getConsumerInfo();
049                if (info.isNetworkSubscription()) {    
050                    boolean highestPrioritySub = true;
051                    for (Iterator<Subscription> it =  duplicateFreeSubs.iterator(); it.hasNext(); ) {
052                        Subscription candidate = it.next();
053                        if (matches(candidate, info)) {
054                            if (hasLowerPriority(candidate, info)) {
055                                it.remove();
056                            } else {
057                                // higher priority matching sub exists
058                                highestPrioritySub = false;
059                                LOG.debug("ignoring lower priority: {} [{}, {}] in favour of: {} [{}, {}]",
060                                        new Object[]{ candidate,
061                                                candidate.getConsumerInfo().getNetworkConsumerIds(),
062                                                candidate.getConsumerInfo().getNetworkConsumerIds(),
063                                                sub,
064                                                sub.getConsumerInfo().getNetworkConsumerIds(),
065                                                sub.getConsumerInfo().getNetworkConsumerIds() });
066                            }
067                        }
068                    }
069                    if (highestPrioritySub) {
070                        duplicateFreeSubs.add(sub);
071                    } 
072                } else {
073                    duplicateFreeSubs.add(sub);
074                }
075            }
076        }
077        
078        return super.dispatch(node, msgContext, duplicateFreeSubs);
079    }
080
081    private boolean hasLowerPriority(Subscription candidate,
082            ConsumerInfo info) {
083       return candidate.getConsumerInfo().getPriority() < info.getPriority();
084    }
085
086    private boolean matches(Subscription candidate, ConsumerInfo info) {
087        boolean matched = false;
088        for (ConsumerId candidateId: candidate.getConsumerInfo().getNetworkConsumerIds()) {
089            for (ConsumerId subId: info.getNetworkConsumerIds()) {
090                if (candidateId.equals(subId)) {
091                    matched = true;
092                    break;
093                }
094            }
095        }
096        return matched;
097    }
098
099}