001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.lang.reflect.Method;
020
021import javax.jms.JMSException;
022import javax.jms.Message;
023import javax.jms.MessageListener;
024import javax.jms.MessageProducer;
025import javax.jms.ServerSession;
026import javax.jms.Session;
027import javax.resource.spi.endpoint.MessageEndpoint;
028import javax.resource.spi.work.Work;
029import javax.resource.spi.work.WorkEvent;
030import javax.resource.spi.work.WorkException;
031import javax.resource.spi.work.WorkListener;
032import javax.resource.spi.work.WorkManager;
033
034import org.apache.activemq.ActiveMQSession;
035import org.apache.activemq.ActiveMQSession.DeliveryListener;
036import org.apache.activemq.TransactionContext;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * 
042 */
043public class ServerSessionImpl implements ServerSession, InboundContext, Work, DeliveryListener {
044
045    public static final Method ON_MESSAGE_METHOD;
046    private static int nextLogId;
047
048    static {
049        try {
050            ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[] {
051                Message.class
052            });
053        } catch (Exception e) {
054            throw new ExceptionInInitializerError(e);
055        }
056    }
057
058
059    private int serverSessionId = getNextLogId();
060    private final Logger log = LoggerFactory.getLogger(ServerSessionImpl.class.getName() + ":" + serverSessionId);
061
062    private ActiveMQSession session;
063    private WorkManager workManager;
064    private MessageEndpoint endpoint;
065    private MessageProducer messageProducer;
066    private final ServerSessionPoolImpl pool;
067
068    private Object runControlMutex = new Object();
069    private boolean runningFlag;
070    /**
071     * True if an error was detected that cause this session to be stale. When a
072     * session is stale, it should not be used again for proccessing.
073     */
074    private boolean stale;
075    /**
076     * Does the TX commit need to be managed by the RA?
077     */
078    private final boolean useRAManagedTx;
079    /**
080     * The maximum number of messages to batch
081     */
082    private final int batchSize;
083    /**
084     * The current number of messages in the batch
085     */
086    private int currentBatchSize;
087
088    public ServerSessionImpl(ServerSessionPoolImpl pool, ActiveMQSession session, WorkManager workManager, MessageEndpoint endpoint, boolean useRAManagedTx, int batchSize) throws JMSException {
089        this.pool = pool;
090        this.session = session;
091        this.workManager = workManager;
092        this.endpoint = endpoint;
093        this.useRAManagedTx = useRAManagedTx;
094        this.session.setMessageListener((MessageListener)endpoint);
095        this.session.setDeliveryListener(this);
096        this.batchSize = batchSize;
097    }
098
099    private static synchronized int getNextLogId() {
100        return nextLogId++;
101    }
102
103    public Session getSession() throws JMSException {
104        return session;
105    }
106
107    protected boolean isStale() {
108        return stale || !session.isRunning() || !session.isClosed();
109    }
110
111    public MessageProducer getMessageProducer() throws JMSException {
112        if (messageProducer == null) {
113            messageProducer = getSession().createProducer(null);
114        }
115        return messageProducer;
116    }
117
118    /**
119     * @see javax.jms.ServerSession#start()
120     */
121    public void start() throws JMSException {
122
123        synchronized (runControlMutex) {
124            if (runningFlag) {
125                log.debug("Start request ignored, already running.");
126                return;
127            }
128            runningFlag = true;
129        }
130
131        // We get here because we need to start a async worker.
132        log.debug("Starting run.");
133        try {
134            workManager.scheduleWork(this, WorkManager.INDEFINITE, null, new WorkListener() {
135                // The work listener is useful only for debugging...
136                public void workAccepted(WorkEvent event) {
137                    log.debug("Work accepted: " + event);
138                }
139
140                public void workRejected(WorkEvent event) {
141                    log.debug("Work rejected: " + event);
142                }
143
144                public void workStarted(WorkEvent event) {
145                    log.debug("Work started: " + event);
146                }
147
148                public void workCompleted(WorkEvent event) {
149                    log.debug("Work completed: " + event);
150                }
151
152            });
153        } catch (WorkException e) {
154            throw (JMSException)new JMSException("Start failed: " + e).initCause(e);
155        }
156    }
157
158    /**
159     * @see java.lang.Runnable#run()
160     */
161    public void run() {
162        log.debug("{} Running", this);
163        currentBatchSize = 0;
164        while (true) {
165            log.debug("{} run loop", this);
166            try {
167                InboundContextSupport.register(this);
168                if (session.isClosed()) {
169                    stale = true;
170                } else if (session.isRunning() ) {
171                    session.run();
172                } else {
173                    log.debug("JMS Session {} with unconsumed {} is no longer running (maybe due to loss of connection?), marking ServerSession as stale", session, session.getUnconsumedMessages().size());
174                    stale = true;
175                }
176            } catch (Throwable e) {
177                stale = true;
178                if ( log.isDebugEnabled() ) {
179                    log.debug("Endpoint {} failed to process message.", this, e);
180                } else if ( log.isInfoEnabled() ) {
181                    log.info("Endpoint {} failed to process message. Reason: " + e.getMessage(), this);
182                }
183            } finally {
184                InboundContextSupport.unregister(this);
185                log.debug("run loop end");
186                synchronized (runControlMutex) {
187                    // This endpoint may have gone stale due to error
188                    if (stale) {
189                        log.debug("Session {} stale, removing from pool", this);
190                        runningFlag = false;
191                        pool.removeFromPool(this);
192                        break;
193                    }
194                    if (!session.hasUncomsumedMessages()) {
195                        runningFlag = false;
196                        log.debug("Session {} has no unconsumed message, returning to pool", this);
197                        pool.returnToPool(this);
198                        break;
199                    } else {
200                        log.debug("Session has session has more work to do b/c of unconsumed", this);
201                    }
202                }
203            }
204        }
205        log.debug("{} Run finished", this);
206    }
207
208    /**
209     * The ActiveMQSession's run method will call back to this method before
210     * dispactching a message to the MessageListener.
211     */
212    public void beforeDelivery(ActiveMQSession session, Message msg) {
213        if (currentBatchSize == 0) {
214            try {
215                endpoint.beforeDelivery(ON_MESSAGE_METHOD);
216            } catch (Throwable e) {
217                throw new RuntimeException("Endpoint before delivery notification failure", e);
218            }
219        }
220    }
221
222    /**
223     * The ActiveMQSession's run method will call back to this method after
224     * dispactching a message to the MessageListener.
225     */
226    public void afterDelivery(ActiveMQSession session, Message msg) {
227        if (++currentBatchSize >= batchSize || !session.hasUncomsumedMessages()) {
228            currentBatchSize = 0;
229            try {
230                endpoint.afterDelivery();
231            } catch (Throwable e) {
232                throw new RuntimeException("Endpoint after delivery notification failure: " + e, e);
233            } finally {
234                TransactionContext transactionContext = session.getTransactionContext();
235                if (transactionContext != null && transactionContext.isInLocalTransaction()) {
236                    if (!useRAManagedTx) {
237                        // Sanitiy Check: If the local transaction has not been
238                        // commited..
239                        // Commit it now.
240                        log.warn("Local transaction had not been commited. Commiting now.");
241                    }
242                    try {
243                        session.commit();
244                    } catch (JMSException e) {
245                        log.info("Commit failed:", e);
246                    }
247                }
248            }
249        }
250    }
251
252    /**
253     * @see javax.resource.spi.work.Work#release()
254     */
255    public void release() {
256        log.debug("release called");
257    }
258
259    /**
260     * @see java.lang.Object#toString()
261     */
262    @Override
263    public String toString() {
264        return "ServerSessionImpl:" + serverSessionId + "{" + session +"}";
265    }
266
267    public void close() {
268        try {
269            endpoint.release();
270        } catch (Throwable e) {
271            log.debug("Endpoint did not release properly: " + e.getMessage(), e);
272        }
273        try {
274            session.close();
275        } catch (Throwable e) {
276            log.debug("Session did not close properly: " + e.getMessage(), e);
277        }
278    }
279
280}