001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.util.Set; 020import java.util.concurrent.CopyOnWriteArraySet; 021import java.util.concurrent.TimeUnit; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.atomic.AtomicInteger; 024 025import org.apache.activemq.command.ConsumerId; 026import org.apache.activemq.command.ConsumerInfo; 027import org.apache.activemq.command.NetworkBridgeFilter; 028import org.apache.activemq.command.SubscriptionInfo; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * Represents a network bridge interface 034 */ 035public class DemandSubscription { 036 private static final Logger LOG = LoggerFactory.getLogger(DemandSubscription.class); 037 038 private final ConsumerInfo remoteInfo; 039 private final ConsumerInfo localInfo; 040 private final Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>(); 041 private final AtomicInteger dispatched = new AtomicInteger(0); 042 private final AtomicBoolean activeWaiter = new AtomicBoolean(); 043 private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>(); 044 private SubscriptionInfo localDurableSubscriber; 045 046 private NetworkBridgeFilter networkBridgeFilter; 047 private boolean staticallyIncluded; 048 049 DemandSubscription(ConsumerInfo info) { 050 remoteInfo = info; 051 localInfo = info.copy(); 052 localInfo.setNetworkSubscription(true); 053 remoteSubsIds.add(info.getConsumerId()); 054 } 055 056 @Override 057 public String toString() { 058 return "DemandSub{" + localInfo.getConsumerId() + ",remotes:" + remoteSubsIds + "}"; 059 } 060 061 /** 062 * Increment the consumers associated with this subscription 063 * 064 * @param id 065 * @return true if added 066 */ 067 public boolean add(ConsumerId id) { 068 return remoteSubsIds.add(id); 069 } 070 071 /** 072 * Increment the consumers associated with this subscription 073 * 074 * @param id 075 * @return true if removed 076 */ 077 public boolean remove(ConsumerId id) { 078 return remoteSubsIds.remove(id); 079 } 080 081 public Set<SubscriptionInfo> getDurableRemoteSubs() { 082 return durableRemoteSubs; 083 } 084 085 /** 086 * @return true if there are no interested consumers 087 */ 088 public boolean isEmpty() { 089 return remoteSubsIds.isEmpty(); 090 } 091 092 public int size() { 093 return remoteSubsIds.size(); 094 } 095 /** 096 * @return Returns the localInfo. 097 */ 098 public ConsumerInfo getLocalInfo() { 099 return localInfo; 100 } 101 102 /** 103 * @return Returns the remoteInfo. 104 */ 105 public ConsumerInfo getRemoteInfo() { 106 return remoteInfo; 107 } 108 109 public void waitForCompletion() { 110 if (dispatched.get() > 0) { 111 LOG.debug("Waiting for completion for sub: {}, dispatched: {}", localInfo.getConsumerId(), this.dispatched.get()); 112 activeWaiter.set(true); 113 if (dispatched.get() > 0) { 114 synchronized (activeWaiter) { 115 try { 116 activeWaiter.wait(TimeUnit.SECONDS.toMillis(30)); 117 } catch (InterruptedException ignored) { 118 } 119 } 120 if (this.dispatched.get() > 0) { 121 LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, expect potentially {} duplicate forwards", this.dispatched.get()); 122 } 123 } 124 } 125 } 126 127 public void decrementOutstandingResponses() { 128 if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) { 129 synchronized (activeWaiter) { 130 activeWaiter.notifyAll(); 131 } 132 } 133 } 134 135 public boolean incrementOutstandingResponses() { 136 dispatched.incrementAndGet(); 137 if (activeWaiter.get()) { 138 decrementOutstandingResponses(); 139 return false; 140 } 141 return true; 142 } 143 144 public NetworkBridgeFilter getNetworkBridgeFilter() { 145 return networkBridgeFilter; 146 } 147 148 public void setNetworkBridgeFilter(NetworkBridgeFilter networkBridgeFilter) { 149 this.networkBridgeFilter = networkBridgeFilter; 150 } 151 152 public SubscriptionInfo getLocalDurableSubscriber() { 153 return localDurableSubscriber; 154 } 155 156 public void setLocalDurableSubscriber(SubscriptionInfo localDurableSubscriber) { 157 this.localDurableSubscriber = localDurableSubscriber; 158 } 159 160 public boolean isStaticallyIncluded() { 161 return staticallyIncluded; 162 } 163 164 public void setStaticallyIncluded(boolean staticallyIncluded) { 165 this.staticallyIncluded = staticallyIncluded; 166 } 167}