001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.state; 019 020import java.util.ArrayList; 021import java.util.Collection; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import java.util.concurrent.atomic.AtomicBoolean; 031 032import org.apache.activemq.command.ActiveMQDestination; 033import org.apache.activemq.command.ConnectionInfo; 034import org.apache.activemq.command.ConsumerId; 035import org.apache.activemq.command.ConsumerInfo; 036import org.apache.activemq.command.DestinationInfo; 037import org.apache.activemq.command.SessionId; 038import org.apache.activemq.command.SessionInfo; 039import org.apache.activemq.command.TransactionId; 040 041public class ConnectionState { 042 043 ConnectionInfo info; 044 private final ConcurrentMap<TransactionId, TransactionState> transactions = new ConcurrentHashMap<TransactionId, TransactionState>(); 045 private final ConcurrentMap<SessionId, SessionState> sessions = new ConcurrentHashMap<SessionId, SessionState>(); 046 private final List<DestinationInfo> tempDestinations = Collections.synchronizedList(new ArrayList<DestinationInfo>()); 047 private final AtomicBoolean shutdown = new AtomicBoolean(false); 048 private boolean connectionInterruptProcessingComplete = true; 049 private HashMap<ConsumerId, ConsumerInfo> recoveringPullConsumers; 050 051 public ConnectionState(ConnectionInfo info) { 052 this.info = info; 053 // Add the default session id. 054 addSession(new SessionInfo(info, -1)); 055 } 056 057 @Override 058 public String toString() { 059 return info.toString(); 060 } 061 062 public void reset(ConnectionInfo info) { 063 this.info = info; 064 transactions.clear(); 065 sessions.clear(); 066 tempDestinations.clear(); 067 shutdown.set(false); 068 // Add the default session id. 069 addSession(new SessionInfo(info, -1)); 070 } 071 072 public void addTempDestination(DestinationInfo info) { 073 checkShutdown(); 074 tempDestinations.add(info); 075 } 076 077 public void removeTempDestination(ActiveMQDestination destination) { 078 for (Iterator<DestinationInfo> iter = tempDestinations.iterator(); iter.hasNext();) { 079 DestinationInfo di = iter.next(); 080 if (di.getDestination().equals(destination)) { 081 iter.remove(); 082 } 083 } 084 } 085 086 public void addTransactionState(TransactionId id) { 087 checkShutdown(); 088 transactions.put(id, new TransactionState(id)); 089 } 090 091 public TransactionState getTransactionState(TransactionId id) { 092 return transactions.get(id); 093 } 094 095 public Collection<TransactionState> getTransactionStates() { 096 return transactions.values(); 097 } 098 099 public TransactionState removeTransactionState(TransactionId id) { 100 return transactions.remove(id); 101 } 102 103 public void addSession(SessionInfo info) { 104 checkShutdown(); 105 sessions.put(info.getSessionId(), new SessionState(info)); 106 } 107 108 public SessionState removeSession(SessionId id) { 109 return sessions.remove(id); 110 } 111 112 public SessionState getSessionState(SessionId id) { 113 return sessions.get(id); 114 } 115 116 public ConnectionInfo getInfo() { 117 return info; 118 } 119 120 public Set<SessionId> getSessionIds() { 121 return sessions.keySet(); 122 } 123 124 public List<DestinationInfo> getTempDestinations() { 125 return tempDestinations; 126 } 127 128 public Collection<SessionState> getSessionStates() { 129 return sessions.values(); 130 } 131 132 private void checkShutdown() { 133 if (shutdown.get()) { 134 throw new IllegalStateException("Disposed"); 135 } 136 } 137 138 public void shutdown() { 139 if (shutdown.compareAndSet(false, true)) { 140 for (Iterator<SessionState> iter = sessions.values().iterator(); iter.hasNext();) { 141 SessionState ss = iter.next(); 142 ss.shutdown(); 143 } 144 } 145 } 146 147 public Map<ConsumerId, ConsumerInfo> getRecoveringPullConsumers() { 148 if (recoveringPullConsumers == null) { 149 recoveringPullConsumers = new HashMap<ConsumerId, ConsumerInfo>(); 150 } 151 return recoveringPullConsumers; 152 } 153 154 public void setConnectionInterruptProcessingComplete(boolean connectionInterruptProcessingComplete) { 155 this.connectionInterruptProcessingComplete = connectionInterruptProcessingComplete; 156 } 157 158 public boolean isConnectionInterruptProcessingComplete() { 159 return connectionInterruptProcessingComplete; 160 } 161}