001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.util.ArrayList;
020import java.util.Iterator;
021import java.util.List;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.locks.Lock;
024import java.util.concurrent.locks.ReentrantLock;
025
026import javax.jms.JMSException;
027import javax.jms.ServerSession;
028import javax.jms.ServerSessionPool;
029import javax.jms.Session;
030import javax.resource.spi.UnavailableException;
031import javax.resource.spi.endpoint.MessageEndpoint;
032
033import org.apache.activemq.ActiveMQConnection;
034import org.apache.activemq.ActiveMQQueueSession;
035import org.apache.activemq.ActiveMQSession;
036import org.apache.activemq.ActiveMQTopicSession;
037import org.apache.activemq.command.MessageDispatch;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 *  $Date$
043 */
044public class ServerSessionPoolImpl implements ServerSessionPool {
045
046    private static final Logger LOG = LoggerFactory.getLogger(ServerSessionPoolImpl.class);
047
048    private final ActiveMQEndpointWorker activeMQAsfEndpointWorker;
049    private final int maxSessions;
050
051    private final List<ServerSessionImpl> idleSessions = new ArrayList<ServerSessionImpl>();
052    private final List<ServerSessionImpl> activeSessions = new ArrayList<ServerSessionImpl>();
053    private final Lock sessionLock = new ReentrantLock();
054    private final AtomicBoolean closing = new AtomicBoolean(false);
055
056    public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQAsfEndpointWorker, int maxSessions) {
057        this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker;
058        this.maxSessions = maxSessions;
059    }
060
061    private ServerSessionImpl createServerSessionImpl() throws JMSException {
062        MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec();
063        int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession();
064        final ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection();
065        if (connection == null) {
066            // redispatch of pending prefetched messages after disconnect can have a null connection
067            return null;
068        }
069        final ActiveMQSession session = (ActiveMQSession)connection.createSession(activeMQAsfEndpointWorker.transacted, acknowledge);
070        MessageEndpoint endpoint;
071        try {
072            int batchSize = 0;
073            if (activationSpec.getEnableBatchBooleanValue()) {
074                batchSize = activationSpec.getMaxMessagesPerBatchIntValue();
075            }
076            if (activationSpec.isUseRAManagedTransactionEnabled()) {
077                // The RA will manage the transaction commit.
078                endpoint = createEndpoint(null);
079                return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize);
080            } else {
081                // Give the container an object to manage to transaction with.
082                endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext()));
083                return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize);
084            }
085        } catch (UnavailableException e) {
086            // The container could be limiting us on the number of endpoints
087            // that are being created.
088            if (LOG.isDebugEnabled()) {
089                LOG.debug("Could not create an endpoint.", e);
090            }
091            session.close();
092            return null;
093        }
094    }
095
096    private MessageEndpoint createEndpoint(LocalAndXATransaction txResourceProxy) throws UnavailableException {
097        MessageEndpoint endpoint;
098        endpoint = activeMQAsfEndpointWorker.endpointFactory.createEndpoint(txResourceProxy);
099        MessageEndpointProxy endpointProxy = new MessageEndpointProxy(endpoint);
100        return endpointProxy;
101    }
102
103    /**
104     */
105    public ServerSession getServerSession() throws JMSException {
106        if (LOG.isDebugEnabled()) {
107            LOG.debug("ServerSession requested.");
108        }
109        if (closing.get()) {
110            throw new JMSException("Session Pool Shutting Down.");
111        }
112        ServerSessionImpl ss = null;
113        sessionLock.lock();
114        try {
115            ss = getExistingServerSession(false);
116        } finally {
117            sessionLock.unlock();
118        }
119        if (ss != null) {
120            return ss;
121        }
122        ss = createServerSessionImpl();
123        sessionLock.lock();
124        try {
125            // We may not be able to create a session due to the container
126            // restricting us.
127            if (ss == null) {
128                if (activeSessions.isEmpty() && idleSessions.isEmpty()) {
129                    throw new JMSException("Endpoint factory did not allow creation of any endpoints.");
130                }
131
132                ss = getExistingServerSession(true);
133            } else {
134                activeSessions.add(ss);
135            }
136        } finally {
137            sessionLock.unlock();
138        }
139        if (LOG.isDebugEnabled()) {
140            LOG.debug("Created a new session: " + ss);
141        }
142        return ss;
143
144    }
145
146    /**
147     * Must be called with sessionLock held.
148     * Returns an idle session if one exists or an active session if no more
149     * sessions can be created.  Sessions can not be created if force is true
150     * or activeSessions >= maxSessions.
151     * @param force do not check activeSessions >= maxSessions, return an active connection anyway.
152     * @return an already existing session.
153     */
154    private ServerSessionImpl getExistingServerSession(boolean force) {
155        ServerSessionImpl ss = null;
156        if (idleSessions.size() > 0) {
157            ss = idleSessions.remove(idleSessions.size() - 1);
158        }
159        if (ss != null) {
160            activeSessions.add(ss);
161            if (LOG.isDebugEnabled()) {
162                LOG.debug("Using idle session: " + ss);
163            }
164        } else if (force || activeSessions.size() >= maxSessions) {
165            // If we are at the upper limit
166            // then reuse the already created sessions..
167            // This is going to queue up messages into a session for
168            // processing.
169            ss = getExistingActiveServerSession();
170        }
171        return ss;
172    }
173
174    /**
175     * Must be called with sessionLock held.
176     * Returns the first session from activeSessions, shifting it to last.
177     * @return session
178     */
179    private ServerSessionImpl getExistingActiveServerSession() {
180        ServerSessionImpl ss = null;
181        if (!activeSessions.isEmpty()) {
182            if (activeSessions.size() > 1) {
183                // round robin
184                ss = activeSessions.remove(0);
185                activeSessions.add(ss);
186            } else {
187                ss = activeSessions.get(0);
188            }
189        }
190        if (LOG.isDebugEnabled()) {
191            LOG.debug("Reusing an active session: " + ss);
192        }
193        return ss;
194    }
195
196    public void returnToPool(ServerSessionImpl ss) {
197        sessionLock.lock();
198            activeSessions.remove(ss);
199        try {
200            // make sure we only return non-stale sessions to the pool
201            if ( ss.isStale() ) {
202                if ( LOG.isDebugEnabled() ) {
203                    LOG.debug("Discarding stale ServerSession to be returned to pool: " + ss);
204                }
205                ss.close();
206            } else {
207                if (LOG.isDebugEnabled()) {
208                    LOG.debug("ServerSession returned to pool: " + ss);
209                }
210            idleSessions.add(ss);
211            }
212        } finally {
213            sessionLock.unlock();
214        }
215        synchronized (closing) {
216            closing.notify();
217        }
218    }
219
220    public void removeFromPool(ServerSessionImpl ss) {
221        sessionLock.lock();
222        try {
223            activeSessions.remove(ss);
224        } finally {
225            sessionLock.unlock();
226        }
227        try {
228            ActiveMQSession session = (ActiveMQSession)ss.getSession();
229            List l = session.getUnconsumedMessages();
230            if (!l.isEmpty()) {
231                ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection();
232                if (connection != null) {
233                    for (Iterator i = l.iterator(); i.hasNext();) {
234                        MessageDispatch md = (MessageDispatch)i.next();
235                        if (connection.hasDispatcher(md.getConsumerId())) {
236                            dispatchToSession(md);
237                            LOG.trace("on remove of {} redispatch of {}", session, md);
238                        } else {
239                            LOG.trace("on remove not redispatching {}, dispatcher no longer present on {}", md, session.getConnection());
240                        }
241                    }
242                } else {
243                    LOG.trace("on remove of {} not redispatching while disconnected", session);
244                }
245            }
246        } catch (Throwable t) {
247            LOG.error("Error redispatching unconsumed messages from stale server session {}", ss, t);
248        }
249        ss.close();
250        synchronized (closing) {
251            closing.notify();
252        }
253    }
254
255    /**
256     * @param messageDispatch
257     *            the message to dispatch
258     * @throws JMSException
259     */
260    private void dispatchToSession(MessageDispatch messageDispatch)
261            throws JMSException {
262
263        ServerSession serverSession = getServerSession();
264        Session s = serverSession.getSession();
265        ActiveMQSession session = null;
266        if (s instanceof ActiveMQSession) {
267            session = (ActiveMQSession) s;
268        } else if (s instanceof ActiveMQQueueSession) {
269            session = (ActiveMQSession) s;
270        } else if (s instanceof ActiveMQTopicSession) {
271            session = (ActiveMQSession) s;
272        } else {
273            activeMQAsfEndpointWorker.getConnection()
274                    .onAsyncException(new JMSException(
275                            "Session pool provided an invalid session type: "
276                                    + s.getClass()));
277        }
278        session.dispatch(messageDispatch);
279        serverSession.start();
280    }
281
282    public void close() {
283        closing.set(true);
284        int activeCount = closeSessions();
285        // we may have to wait erroneously 250ms if an
286        // active session is removed during our wait and we
287        // are not notified
288        while (activeCount > 0) {
289            if (LOG.isDebugEnabled()) {
290                LOG.debug("Active Sessions = " + activeCount);
291            }
292            try {
293                synchronized (closing) {
294                    closing.wait(250);
295                }
296            } catch (InterruptedException e) {
297                Thread.currentThread().interrupt();
298                return;
299            }
300            activeCount = closeSessions();
301        }
302    }
303
304
305    protected int closeSessions() {
306        sessionLock.lock();
307        try {
308            for (ServerSessionImpl ss : activeSessions) {
309                try {
310                    ActiveMQSession session = (ActiveMQSession) ss.getSession();
311                    if (!session.isClosed()) {
312                        session.close();
313                    }
314                } catch (JMSException ignored) {
315                    if (LOG.isDebugEnabled()) {
316                        LOG.debug("Failed to close active running server session {}, reason:{}", ss, ignored.toString(), ignored);
317                    }
318                }
319            }
320            for (ServerSessionImpl ss : idleSessions) {
321                ss.close();
322            }
323            idleSessions.clear();
324            return activeSessions.size();
325        } finally {
326            sessionLock.unlock();
327        }
328    }
329
330    /**
331     * @return Returns the closing.
332     */
333    public boolean isClosing() {
334        return closing.get();
335    }
336
337    /**
338     * @param closing The closing to set.
339     */
340    public void setClosing(boolean closing) {
341        this.closing.set(closing);
342    }
343
344}