001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.ra; 018 019import java.util.ArrayList; 020import java.util.Iterator; 021import java.util.List; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.locks.Lock; 024import java.util.concurrent.locks.ReentrantLock; 025 026import javax.jms.JMSException; 027import javax.jms.ServerSession; 028import javax.jms.ServerSessionPool; 029import javax.jms.Session; 030import javax.resource.spi.UnavailableException; 031import javax.resource.spi.endpoint.MessageEndpoint; 032 033import org.apache.activemq.ActiveMQConnection; 034import org.apache.activemq.ActiveMQQueueSession; 035import org.apache.activemq.ActiveMQSession; 036import org.apache.activemq.ActiveMQTopicSession; 037import org.apache.activemq.command.MessageDispatch; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * $Date$ 043 */ 044public class ServerSessionPoolImpl implements ServerSessionPool { 045 046 private static final Logger LOG = LoggerFactory.getLogger(ServerSessionPoolImpl.class); 047 048 private final ActiveMQEndpointWorker activeMQAsfEndpointWorker; 049 private final int maxSessions; 050 051 private final List<ServerSessionImpl> idleSessions = new ArrayList<ServerSessionImpl>(); 052 private final List<ServerSessionImpl> activeSessions = new ArrayList<ServerSessionImpl>(); 053 private final Lock sessionLock = new ReentrantLock(); 054 private final AtomicBoolean closing = new AtomicBoolean(false); 055 056 public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQAsfEndpointWorker, int maxSessions) { 057 this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker; 058 this.maxSessions = maxSessions; 059 } 060 061 private ServerSessionImpl createServerSessionImpl() throws JMSException { 062 MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec(); 063 int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession(); 064 final ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection(); 065 if (connection == null) { 066 // redispatch of pending prefetched messages after disconnect can have a null connection 067 return null; 068 } 069 final ActiveMQSession session = (ActiveMQSession)connection.createSession(activeMQAsfEndpointWorker.transacted, acknowledge); 070 MessageEndpoint endpoint; 071 try { 072 int batchSize = 0; 073 if (activationSpec.getEnableBatchBooleanValue()) { 074 batchSize = activationSpec.getMaxMessagesPerBatchIntValue(); 075 } 076 if (activationSpec.isUseRAManagedTransactionEnabled()) { 077 // The RA will manage the transaction commit. 078 endpoint = createEndpoint(null); 079 return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize); 080 } else { 081 // Give the container an object to manage to transaction with. 082 endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext())); 083 return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize); 084 } 085 } catch (UnavailableException e) { 086 // The container could be limiting us on the number of endpoints 087 // that are being created. 088 if (LOG.isDebugEnabled()) { 089 LOG.debug("Could not create an endpoint.", e); 090 } 091 session.close(); 092 return null; 093 } 094 } 095 096 private MessageEndpoint createEndpoint(LocalAndXATransaction txResourceProxy) throws UnavailableException { 097 MessageEndpoint endpoint; 098 endpoint = activeMQAsfEndpointWorker.endpointFactory.createEndpoint(txResourceProxy); 099 MessageEndpointProxy endpointProxy = new MessageEndpointProxy(endpoint); 100 return endpointProxy; 101 } 102 103 /** 104 */ 105 public ServerSession getServerSession() throws JMSException { 106 if (LOG.isDebugEnabled()) { 107 LOG.debug("ServerSession requested."); 108 } 109 if (closing.get()) { 110 throw new JMSException("Session Pool Shutting Down."); 111 } 112 ServerSessionImpl ss = null; 113 sessionLock.lock(); 114 try { 115 ss = getExistingServerSession(false); 116 } finally { 117 sessionLock.unlock(); 118 } 119 if (ss != null) { 120 return ss; 121 } 122 ss = createServerSessionImpl(); 123 sessionLock.lock(); 124 try { 125 // We may not be able to create a session due to the container 126 // restricting us. 127 if (ss == null) { 128 if (activeSessions.isEmpty() && idleSessions.isEmpty()) { 129 throw new JMSException("Endpoint factory did not allow creation of any endpoints."); 130 } 131 132 ss = getExistingServerSession(true); 133 } else { 134 activeSessions.add(ss); 135 } 136 } finally { 137 sessionLock.unlock(); 138 } 139 if (LOG.isDebugEnabled()) { 140 LOG.debug("Created a new session: " + ss); 141 } 142 return ss; 143 144 } 145 146 /** 147 * Must be called with sessionLock held. 148 * Returns an idle session if one exists or an active session if no more 149 * sessions can be created. Sessions can not be created if force is true 150 * or activeSessions >= maxSessions. 151 * @param force do not check activeSessions >= maxSessions, return an active connection anyway. 152 * @return an already existing session. 153 */ 154 private ServerSessionImpl getExistingServerSession(boolean force) { 155 ServerSessionImpl ss = null; 156 if (idleSessions.size() > 0) { 157 ss = idleSessions.remove(idleSessions.size() - 1); 158 } 159 if (ss != null) { 160 activeSessions.add(ss); 161 if (LOG.isDebugEnabled()) { 162 LOG.debug("Using idle session: " + ss); 163 } 164 } else if (force || activeSessions.size() >= maxSessions) { 165 // If we are at the upper limit 166 // then reuse the already created sessions.. 167 // This is going to queue up messages into a session for 168 // processing. 169 ss = getExistingActiveServerSession(); 170 } 171 return ss; 172 } 173 174 /** 175 * Must be called with sessionLock held. 176 * Returns the first session from activeSessions, shifting it to last. 177 * @return session 178 */ 179 private ServerSessionImpl getExistingActiveServerSession() { 180 ServerSessionImpl ss = null; 181 if (!activeSessions.isEmpty()) { 182 if (activeSessions.size() > 1) { 183 // round robin 184 ss = activeSessions.remove(0); 185 activeSessions.add(ss); 186 } else { 187 ss = activeSessions.get(0); 188 } 189 } 190 if (LOG.isDebugEnabled()) { 191 LOG.debug("Reusing an active session: " + ss); 192 } 193 return ss; 194 } 195 196 public void returnToPool(ServerSessionImpl ss) { 197 sessionLock.lock(); 198 activeSessions.remove(ss); 199 try { 200 // make sure we only return non-stale sessions to the pool 201 if ( ss.isStale() ) { 202 if ( LOG.isDebugEnabled() ) { 203 LOG.debug("Discarding stale ServerSession to be returned to pool: " + ss); 204 } 205 ss.close(); 206 } else { 207 if (LOG.isDebugEnabled()) { 208 LOG.debug("ServerSession returned to pool: " + ss); 209 } 210 idleSessions.add(ss); 211 } 212 } finally { 213 sessionLock.unlock(); 214 } 215 synchronized (closing) { 216 closing.notify(); 217 } 218 } 219 220 public void removeFromPool(ServerSessionImpl ss) { 221 sessionLock.lock(); 222 try { 223 activeSessions.remove(ss); 224 } finally { 225 sessionLock.unlock(); 226 } 227 try { 228 ActiveMQSession session = (ActiveMQSession)ss.getSession(); 229 List<MessageDispatch> l = session.getUnconsumedMessages(); 230 if (!isClosing() && !l.isEmpty()) { 231 ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection(); 232 if (connection != null) { 233 for (Iterator i = l.iterator(); i.hasNext();) { 234 MessageDispatch md = (MessageDispatch)i.next(); 235 if (connection.hasDispatcher(md.getConsumerId())) { 236 dispatchToSession(md); 237 LOG.trace("on remove of {} redispatch of {}", session, md); 238 } else { 239 LOG.trace("on remove not redispatching {}, dispatcher no longer present on {}", md, session.getConnection()); 240 } 241 } 242 } else { 243 LOG.trace("on remove of {} not redispatching while disconnected", session); 244 } 245 } 246 } catch (Throwable t) { 247 LOG.error("Error redispatching unconsumed messages from stale server session {}", ss, t); 248 } 249 ss.close(); 250 synchronized (closing) { 251 closing.notify(); 252 } 253 } 254 255 /** 256 * @param messageDispatch 257 * the message to dispatch 258 * @throws JMSException 259 */ 260 private void dispatchToSession(MessageDispatch messageDispatch) 261 throws JMSException { 262 263 ServerSession serverSession = getServerSession(); 264 Session s = serverSession.getSession(); 265 ActiveMQSession session = null; 266 if (s instanceof ActiveMQSession) { 267 session = (ActiveMQSession) s; 268 } else if (s instanceof ActiveMQQueueSession) { 269 session = (ActiveMQSession) s; 270 } else if (s instanceof ActiveMQTopicSession) { 271 session = (ActiveMQSession) s; 272 } else { 273 activeMQAsfEndpointWorker.getConnection() 274 .onAsyncException(new JMSException( 275 "Session pool provided an invalid session type: " 276 + s.getClass())); 277 } 278 session.dispatch(messageDispatch); 279 serverSession.start(); 280 } 281 282 public void close() { 283 closing.set(true); 284 LOG.debug("{} close", this); 285 int activeCount = closeSessions(); 286 // we may have to wait erroneously 250ms if an 287 // active session is removed during our wait and we 288 // are not notified 289 while (activeCount > 0) { 290 if (LOG.isDebugEnabled()) { 291 LOG.debug("Active Sessions = " + activeCount); 292 } 293 try { 294 synchronized (closing) { 295 closing.wait(250); 296 } 297 } catch (InterruptedException e) { 298 Thread.currentThread().interrupt(); 299 return; 300 } 301 activeCount = closeSessions(); 302 } 303 } 304 305 306 protected int closeSessions() { 307 sessionLock.lock(); 308 try { 309 List<ServerSessionImpl> alreadyClosedServerSessions = new ArrayList<>(activeSessions.size()); 310 for (ServerSessionImpl ss : activeSessions) { 311 try { 312 ActiveMQSession session = (ActiveMQSession) ss.getSession(); 313 if (!session.isClosed()) { 314 session.close(); 315 } else { 316 LOG.debug("Session {} already closed", session); 317 alreadyClosedServerSessions.add(ss); 318 319 } 320 } catch (JMSException ignored) { 321 if (LOG.isDebugEnabled()) { 322 LOG.debug("Failed to close active running server session {}, reason:{}", ss, ignored.toString(), ignored); 323 } 324 } 325 } 326 for (ServerSessionImpl ss : alreadyClosedServerSessions) { 327 removeFromPool(ss); 328 } 329 alreadyClosedServerSessions.clear(); 330 331 for (ServerSessionImpl ss : idleSessions) { 332 ss.close(); 333 } 334 idleSessions.clear(); 335 return activeSessions.size(); 336 } finally { 337 sessionLock.unlock(); 338 } 339 } 340 341 /** 342 * @return Returns the closing. 343 */ 344 public boolean isClosing() { 345 return closing.get(); 346 } 347 348 /** 349 * @param closing The closing to set. 350 */ 351 public void setClosing(boolean closing) { 352 this.closing.set(closing); 353 } 354 355}