001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.util.HashMap; 020import java.util.Map; 021import java.util.concurrent.atomic.AtomicLong; 022 023import javax.jms.Destination; 024import javax.jms.IllegalStateException; 025import javax.jms.InvalidDestinationException; 026import javax.jms.JMSException; 027import javax.jms.Message; 028 029import org.apache.activemq.command.ActiveMQDestination; 030import org.apache.activemq.command.ProducerAck; 031import org.apache.activemq.command.ProducerId; 032import org.apache.activemq.command.ProducerInfo; 033import org.apache.activemq.management.JMSProducerStatsImpl; 034import org.apache.activemq.management.StatsCapable; 035import org.apache.activemq.management.StatsImpl; 036import org.apache.activemq.usage.MemoryUsage; 037import org.apache.activemq.util.IntrospectionSupport; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a 043 * destination. A <CODE>MessageProducer</CODE> object is created by passing a 044 * <CODE>Destination</CODE> object to a message-producer creation method 045 * supplied by a session. 046 * <P> 047 * <CODE>MessageProducer</CODE> is the parent interface for all message 048 * producers. 049 * <P> 050 * A client also has the option of creating a message producer without supplying 051 * a destination. In this case, a destination must be provided with every send 052 * operation. A typical use for this kind of message producer is to send replies 053 * to requests using the request's <CODE>JMSReplyTo</CODE> destination. 054 * <P> 055 * A client can specify a default delivery mode, priority, and time to live for 056 * messages sent by a message producer. It can also specify the delivery mode, 057 * priority, and time to live for an individual message. 058 * <P> 059 * A client can specify a time-to-live value in milliseconds for each message it 060 * sends. This value defines a message expiration time that is the sum of the 061 * message's time-to-live and the GMT when it is sent (for transacted sends, 062 * this is the time the client sends the message, not the time the transaction 063 * is committed). 064 * <P> 065 * A JMS provider should do its best to expire messages accurately; however, the 066 * JMS API does not define the accuracy provided. 067 * 068 * 069 * @see javax.jms.TopicPublisher 070 * @see javax.jms.QueueSender 071 * @see javax.jms.Session#createProducer 072 */ 073public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable { 074 075 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageProducer.class); 076 077 protected ProducerInfo info; 078 protected boolean closed; 079 080 private final JMSProducerStatsImpl stats; 081 private AtomicLong messageSequence; 082 private final long startTime; 083 private MessageTransformer transformer; 084 private MemoryUsage producerWindow; 085 086 protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException { 087 super(session); 088 this.info = new ProducerInfo(producerId); 089 this.info.setWindowSize(session.connection.getProducerWindowSize()); 090 // Allows the options on the destination to configure the producerInfo 091 if (destination != null && destination.getOptions() != null) { 092 Map<String, Object> options = IntrospectionSupport.extractProperties( 093 new HashMap<String, Object>(destination.getOptions()), "producer."); 094 IntrospectionSupport.setProperties(this.info, options); 095 if (options.size() > 0) { 096 String msg = "There are " + options.size() 097 + " producer options that couldn't be set on the producer." 098 + " Check the options are spelled correctly." 099 + " Unknown parameters=[" + options + "]." 100 + " This producer cannot be started."; 101 LOG.warn(msg); 102 throw new ConfigurationException(msg); 103 } 104 } 105 106 this.info.setDestination(destination); 107 108 // Enable producer window flow control if protocol >= 3 and the window size > 0 109 if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) { 110 producerWindow = new MemoryUsage("Producer Window: " + producerId); 111 producerWindow.setExecutor(session.getConnectionExecutor()); 112 producerWindow.setLimit(this.info.getWindowSize()); 113 producerWindow.start(); 114 } 115 116 this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE; 117 this.defaultPriority = Message.DEFAULT_PRIORITY; 118 this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE; 119 this.startTime = System.currentTimeMillis(); 120 this.messageSequence = new AtomicLong(0); 121 this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination); 122 try { 123 this.session.addProducer(this); 124 this.session.syncSendPacket(info); 125 } catch (JMSException e) { 126 this.session.removeProducer(this); 127 throw e; 128 } 129 this.setSendTimeout(sendTimeout); 130 setTransformer(session.getTransformer()); 131 } 132 133 @Override 134 public StatsImpl getStats() { 135 return stats; 136 } 137 138 public JMSProducerStatsImpl getProducerStats() { 139 return stats; 140 } 141 142 /** 143 * Gets the destination associated with this <CODE>MessageProducer</CODE>. 144 * 145 * @return this producer's <CODE>Destination/ <CODE> 146 * @throws JMSException if the JMS provider fails to close the producer due to 147 * some internal error. 148 * @since 1.1 149 */ 150 @Override 151 public Destination getDestination() throws JMSException { 152 checkClosed(); 153 return this.info.getDestination(); 154 } 155 156 /** 157 * Closes the message producer. 158 * <P> 159 * Since a provider may allocate some resources on behalf of a <CODE> 160 * MessageProducer</CODE> 161 * outside the Java virtual machine, clients should close them when they are 162 * not needed. Relying on garbage collection to eventually reclaim these 163 * resources may not be timely enough. 164 * 165 * @throws JMSException if the JMS provider fails to close the producer due 166 * to some internal error. 167 */ 168 @Override 169 public void close() throws JMSException { 170 if (!closed) { 171 dispose(); 172 this.session.asyncSendPacket(info.createRemoveCommand()); 173 } 174 } 175 176 @Override 177 public void dispose() { 178 if (!closed) { 179 this.session.removeProducer(this); 180 if (producerWindow != null) { 181 producerWindow.stop(); 182 } 183 closed = true; 184 } 185 } 186 187 /** 188 * Check if the instance of this producer has been closed. 189 * 190 * @throws IllegalStateException 191 */ 192 @Override 193 protected void checkClosed() throws IllegalStateException { 194 if (closed) { 195 throw new IllegalStateException("The producer is closed"); 196 } 197 } 198 199 /** 200 * Sends a message to a destination for an unidentified message producer, 201 * specifying delivery mode, priority and time to live. 202 * <P> 203 * Typically, a message producer is assigned a destination at creation time; 204 * however, the JMS API also supports unidentified message producers, which 205 * require that the destination be supplied every time a message is sent. 206 * 207 * @param destination the destination to send this message to 208 * @param message the message to send 209 * @param deliveryMode the delivery mode to use 210 * @param priority the priority for this message 211 * @param timeToLive the message's lifetime (in milliseconds) 212 * @throws JMSException if the JMS provider fails to send the message due to 213 * some internal error. 214 * @throws UnsupportedOperationException if an invalid destination is 215 * specified. 216 * @throws InvalidDestinationException if a client uses this method with an 217 * invalid destination. 218 * @see javax.jms.Session#createProducer 219 * @since 1.1 220 */ 221 @Override 222 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { 223 this.send(destination, message, deliveryMode, priority, timeToLive, null); 224 } 225 226 public void send(Message message, AsyncCallback onComplete) throws JMSException { 227 this.send(this.getDestination(), 228 message, 229 this.defaultDeliveryMode, 230 this.defaultPriority, 231 this.defaultTimeToLive, onComplete); 232 } 233 234 public void send(Destination destination, Message message, AsyncCallback onComplete) throws JMSException { 235 this.send(destination, 236 message, 237 this.defaultDeliveryMode, 238 this.defaultPriority, 239 this.defaultTimeToLive, 240 onComplete); 241 } 242 243 public void send(Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException { 244 this.send(this.getDestination(), 245 message, 246 deliveryMode, 247 priority, 248 timeToLive, 249 onComplete); 250 } 251 252 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException { 253 checkClosed(); 254 if (destination == null) { 255 if (info.getDestination() == null) { 256 throw new UnsupportedOperationException("A destination must be specified."); 257 } 258 throw new InvalidDestinationException("Don't understand null destinations"); 259 } 260 261 ActiveMQDestination dest; 262 if (destination.equals(info.getDestination())) { 263 dest = (ActiveMQDestination)destination; 264 } else if (info.getDestination() == null) { 265 dest = ActiveMQDestination.transform(destination); 266 } else { 267 throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); 268 } 269 if (dest == null) { 270 throw new JMSException("No destination specified"); 271 } 272 273 if (transformer != null) { 274 Message transformedMessage = transformer.producerTransform(session, this, message); 275 if (transformedMessage != null) { 276 message = transformedMessage; 277 } 278 } 279 280 if (producerWindow != null) { 281 try { 282 producerWindow.waitForSpace(); 283 } catch (InterruptedException e) { 284 throw new JMSException("Send aborted due to thread interrupt."); 285 } 286 } 287 288 this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete); 289 290 stats.onMessage(); 291 } 292 293 public MessageTransformer getTransformer() { 294 return transformer; 295 } 296 297 /** 298 * Sets the transformer used to transform messages before they are sent on 299 * to the JMS bus 300 */ 301 public void setTransformer(MessageTransformer transformer) { 302 this.transformer = transformer; 303 } 304 305 /** 306 * @return the time in milli second when this object was created. 307 */ 308 protected long getStartTime() { 309 return this.startTime; 310 } 311 312 /** 313 * @return Returns the messageSequence. 314 */ 315 protected long getMessageSequence() { 316 return messageSequence.incrementAndGet(); 317 } 318 319 /** 320 * @param messageSequence The messageSequence to set. 321 */ 322 protected void setMessageSequence(AtomicLong messageSequence) { 323 this.messageSequence = messageSequence; 324 } 325 326 /** 327 * @return Returns the info. 328 */ 329 protected ProducerInfo getProducerInfo() { 330 return this.info != null ? this.info : null; 331 } 332 333 /** 334 * @param info The info to set 335 */ 336 protected void setProducerInfo(ProducerInfo info) { 337 this.info = info; 338 } 339 340 @Override 341 public String toString() { 342 return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }"; 343 } 344 345 public void onProducerAck(ProducerAck pa) { 346 if (this.producerWindow != null) { 347 this.producerWindow.decreaseUsage(pa.getSize()); 348 } 349 } 350 351}