001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq; 019 020import java.util.List; 021import javax.jms.JMSException; 022import org.apache.activemq.command.ConsumerId; 023import org.apache.activemq.command.MessageDispatch; 024import org.apache.activemq.thread.Task; 025import org.apache.activemq.thread.TaskRunner; 026import org.apache.activemq.util.JMSExceptionSupport; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * A utility class used by the Session for dispatching messages asynchronously 032 * to consumers 033 * 034 * @see javax.jms.Session 035 */ 036public class ActiveMQSessionExecutor implements Task { 037 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSessionExecutor.class); 038 039 private final ActiveMQSession session; 040 private final MessageDispatchChannel messageQueue; 041 private boolean dispatchedBySessionPool; 042 private volatile TaskRunner taskRunner; 043 private boolean startedOrWarnedThatNotStarted; 044 045 ActiveMQSessionExecutor(ActiveMQSession session) { 046 this.session = session; 047 if (this.session.connection != null && this.session.connection.isMessagePrioritySupported()) { 048 this.messageQueue = new SimplePriorityMessageDispatchChannel(); 049 }else { 050 this.messageQueue = new FifoMessageDispatchChannel(); 051 } 052 } 053 054 void setDispatchedBySessionPool(boolean value) { 055 dispatchedBySessionPool = value; 056 wakeup(); 057 } 058 059 void execute(MessageDispatch message) throws InterruptedException { 060 061 if (!startedOrWarnedThatNotStarted) { 062 063 ActiveMQConnection connection = session.connection; 064 long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout(); 065 if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) { 066 startedOrWarnedThatNotStarted = true; 067 } else { 068 long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated(); 069 070 // lets only warn when a significant amount of time has passed 071 // just in case its normal operation 072 if (elapsedTime > aboutUnstartedConnectionTimeout) { 073 LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection 074 + " Received: " + message); 075 startedOrWarnedThatNotStarted = true; 076 } 077 } 078 } 079 080 if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) { 081 dispatch(message); 082 } else { 083 messageQueue.enqueue(message); 084 wakeup(); 085 } 086 } 087 088 public void wakeup() { 089 if (!dispatchedBySessionPool) { 090 if (session.isSessionAsyncDispatch()) { 091 try { 092 TaskRunner taskRunner = this.taskRunner; 093 if (taskRunner == null) { 094 synchronized (this) { 095 if (this.taskRunner == null) { 096 if (!isRunning()) { 097 // stop has been called 098 return; 099 } 100 this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, 101 "ActiveMQ Session: " + session.getSessionId()); 102 } 103 taskRunner = this.taskRunner; 104 } 105 } 106 taskRunner.wakeup(); 107 } catch (InterruptedException e) { 108 Thread.currentThread().interrupt(); 109 } 110 } else { 111 while (iterate()) { 112 } 113 } 114 } 115 } 116 117 void executeFirst(MessageDispatch message) { 118 messageQueue.enqueueFirst(message); 119 wakeup(); 120 } 121 122 public boolean hasUncomsumedMessages() { 123 return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty(); 124 } 125 126 void dispatch(MessageDispatch message) { 127 // TODO - we should use a Map for this indexed by consumerId 128 for (ActiveMQMessageConsumer consumer : this.session.consumers) { 129 ConsumerId consumerId = message.getConsumerId(); 130 if (consumerId.equals(consumer.getConsumerId())) { 131 consumer.dispatch(message); 132 break; 133 } 134 } 135 } 136 137 synchronized void start() { 138 if (!messageQueue.isRunning()) { 139 messageQueue.start(); 140 if (hasUncomsumedMessages()) { 141 wakeup(); 142 } 143 } 144 } 145 146 void stop() throws JMSException { 147 try { 148 if (messageQueue.isRunning()) { 149 synchronized(this) { 150 messageQueue.stop(); 151 if (this.taskRunner != null) { 152 this.taskRunner.shutdown(); 153 this.taskRunner = null; 154 } 155 } 156 } 157 } catch (InterruptedException e) { 158 Thread.currentThread().interrupt(); 159 throw JMSExceptionSupport.create(e); 160 } 161 } 162 163 boolean isRunning() { 164 return messageQueue.isRunning(); 165 } 166 167 void close() { 168 messageQueue.close(); 169 } 170 171 void clear() { 172 messageQueue.clear(); 173 } 174 175 MessageDispatch dequeueNoWait() { 176 return messageQueue.dequeueNoWait(); 177 } 178 179 protected void clearMessagesInProgress() { 180 messageQueue.clear(); 181 } 182 183 public boolean isEmpty() { 184 return messageQueue.isEmpty(); 185 } 186 187 public boolean iterate() { 188 189 // Deliver any messages queued on the consumer to their listeners. 190 for (ActiveMQMessageConsumer consumer : this.session.consumers) { 191 if (consumer.iterate()) { 192 return true; 193 } 194 } 195 196 // No messages left queued on the listeners.. so now dispatch messages 197 // queued on the session 198 MessageDispatch message = messageQueue.dequeueNoWait(); 199 if (message == null) { 200 return false; 201 } else { 202 dispatch(message); 203 return !messageQueue.isEmpty(); 204 } 205 } 206 207 List<MessageDispatch> getUnconsumedMessages() { 208 return messageQueue.removeAll(); 209 } 210 211 void waitForQueueRestart() throws InterruptedException { 212 synchronized (messageQueue.getMutex()) { 213 while (messageQueue.isRunning() == false) { 214 if (messageQueue.isClosed()) { 215 break; 216 } 217 messageQueue.getMutex().wait(); 218 } 219 } 220 } 221}