001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.message; 018 019import java.io.UnsupportedEncodingException; 020import java.nio.ByteBuffer; 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.Enumeration; 024import java.util.HashMap; 025 026import javax.jms.BytesMessage; 027import javax.jms.DeliveryMode; 028import javax.jms.Destination; 029import javax.jms.JMSException; 030import javax.jms.MapMessage; 031import javax.jms.Message; 032import javax.jms.MessageEOFException; 033import javax.jms.MessageFormatException; 034import javax.jms.ObjectMessage; 035import javax.jms.Queue; 036import javax.jms.StreamMessage; 037import javax.jms.TemporaryQueue; 038import javax.jms.TemporaryTopic; 039import javax.jms.TextMessage; 040import javax.jms.Topic; 041 042import org.apache.activemq.command.ActiveMQMessage; 043import org.apache.activemq.command.MessageId; 044import org.apache.activemq.transport.amqp.AmqpProtocolException; 045import org.apache.qpid.proton.amqp.Binary; 046import org.apache.qpid.proton.amqp.Symbol; 047import org.apache.qpid.proton.amqp.UnsignedByte; 048import org.apache.qpid.proton.amqp.UnsignedInteger; 049import org.apache.qpid.proton.amqp.messaging.AmqpSequence; 050import org.apache.qpid.proton.amqp.messaging.AmqpValue; 051import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; 052import org.apache.qpid.proton.amqp.messaging.Data; 053import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; 054import org.apache.qpid.proton.amqp.messaging.Footer; 055import org.apache.qpid.proton.amqp.messaging.Header; 056import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; 057import org.apache.qpid.proton.amqp.messaging.Properties; 058import org.apache.qpid.proton.amqp.messaging.Section; 059import org.apache.qpid.proton.codec.CompositeWritableBuffer; 060import org.apache.qpid.proton.codec.DroppingWritableBuffer; 061import org.apache.qpid.proton.codec.WritableBuffer; 062import org.apache.qpid.proton.message.ProtonJMessage; 063 064public class JMSMappingOutboundTransformer extends OutboundTransformer { 065 066 public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest"); 067 public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to"); 068 069 public static final byte QUEUE_TYPE = 0x00; 070 public static final byte TOPIC_TYPE = 0x01; 071 public static final byte TEMP_QUEUE_TYPE = 0x02; 072 public static final byte TEMP_TOPIC_TYPE = 0x03; 073 074 // Deprecated legacy values used by old QPid AMQP 1.0 JMS client. 075 076 public static final Symbol LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-to-type"); 077 public static final Symbol LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-reply-type"); 078 079 public static final String LEGACY_QUEUE_TYPE = "queue"; 080 public static final String LEGACY_TOPIC_TYPE = "topic"; 081 public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue"; 082 public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic"; 083 084 public JMSMappingOutboundTransformer(JMSVendor vendor) { 085 super(vendor); 086 } 087 088 @Override 089 public EncodedMessage transform(Message msg) throws Exception { 090 if (msg == null) { 091 return null; 092 } 093 094 try { 095 if (msg.getBooleanProperty(prefixVendor + "NATIVE")) { 096 return null; 097 } 098 } catch (MessageFormatException e) { 099 return null; 100 } 101 ProtonJMessage amqp = convert(msg); 102 103 long messageFormat; 104 try { 105 messageFormat = msg.getLongProperty(this.messageFormatKey); 106 } catch (MessageFormatException e) { 107 return null; 108 } 109 110 ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]); 111 final DroppingWritableBuffer overflow = new DroppingWritableBuffer(); 112 int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow)); 113 if (overflow.position() > 0) { 114 buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]); 115 c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer)); 116 } 117 118 return new EncodedMessage(messageFormat, buffer.array(), 0, c); 119 } 120 121 /** 122 * Perform the conversion between JMS Message and Proton Message without 123 * re-encoding it to array. This is needed because some frameworks may elect 124 * to do this on their own way (Netty for instance using Nettybuffers) 125 * 126 * @param msg 127 * @return 128 * @throws Exception 129 */ 130 public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException { 131 Header header = new Header(); 132 Properties props = new Properties(); 133 HashMap<Symbol, Object> daMap = null; 134 HashMap<Symbol, Object> maMap = null; 135 HashMap apMap = null; 136 Section body = null; 137 HashMap footerMap = null; 138 if (msg instanceof BytesMessage) { 139 BytesMessage m = (BytesMessage) msg; 140 byte data[] = new byte[(int) m.getBodyLength()]; 141 m.readBytes(data); 142 m.reset(); // Need to reset after readBytes or future readBytes 143 // calls (ex: redeliveries) will fail and return -1 144 body = new Data(new Binary(data)); 145 } 146 if (msg instanceof TextMessage) { 147 body = new AmqpValue(((TextMessage) msg).getText()); 148 } 149 if (msg instanceof MapMessage) { 150 final HashMap<String, Object> map = new HashMap<String, Object>(); 151 final MapMessage m = (MapMessage) msg; 152 final Enumeration<String> names = m.getMapNames(); 153 while (names.hasMoreElements()) { 154 String key = names.nextElement(); 155 map.put(key, m.getObject(key)); 156 } 157 body = new AmqpValue(map); 158 } 159 if (msg instanceof StreamMessage) { 160 ArrayList<Object> list = new ArrayList<Object>(); 161 final StreamMessage m = (StreamMessage) msg; 162 try { 163 while (true) { 164 list.add(m.readObject()); 165 } 166 } catch (MessageEOFException e) { 167 } 168 body = new AmqpSequence(list); 169 } 170 if (msg instanceof ObjectMessage) { 171 body = new AmqpValue(((ObjectMessage) msg).getObject()); 172 } 173 174 header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false); 175 header.setPriority(new UnsignedByte((byte) msg.getJMSPriority())); 176 if (msg.getJMSType() != null) { 177 props.setSubject(msg.getJMSType()); 178 } 179 if (msg.getJMSMessageID() != null) { 180 ActiveMQMessage amqMsg = (ActiveMQMessage) msg; 181 182 MessageId msgId = amqMsg.getMessageId(); 183 if (msgId.getTextView() != null) { 184 try { 185 props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId.getTextView())); 186 } catch (AmqpProtocolException e) { 187 props.setMessageId(msgId.getTextView().toString()); 188 } 189 } else { 190 props.setMessageId(msgId.toString()); 191 } 192 } 193 if (msg.getJMSDestination() != null) { 194 props.setTo(vendor.toAddress(msg.getJMSDestination())); 195 if (maMap == null) { 196 maMap = new HashMap<Symbol, Object>(); 197 } 198 maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSDestination())); 199 200 // Deprecated: used by legacy QPid AMQP 1.0 JMS client 201 maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSDestination())); 202 } 203 if (msg.getJMSReplyTo() != null) { 204 props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo())); 205 if (maMap == null) { 206 maMap = new HashMap<Symbol, Object>(); 207 } 208 maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSReplyTo())); 209 210 // Deprecated: used by legacy QPid AMQP 1.0 JMS client 211 maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo())); 212 } 213 if (msg.getJMSCorrelationID() != null) { 214 props.setCorrelationId(msg.getJMSCorrelationID()); 215 } 216 if (msg.getJMSExpiration() != 0) { 217 long ttl = msg.getJMSExpiration() - System.currentTimeMillis(); 218 if (ttl < 0) { 219 ttl = 1; 220 } 221 header.setTtl(new UnsignedInteger((int) ttl)); 222 223 props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration())); 224 } 225 if (msg.getJMSTimestamp() != 0) { 226 props.setCreationTime(new Date(msg.getJMSTimestamp())); 227 } 228 229 final Enumeration<String> keys = msg.getPropertyNames(); 230 while (keys.hasMoreElements()) { 231 String key = keys.nextElement(); 232 if (key.equals(messageFormatKey) || key.equals(nativeKey)) { 233 // skip.. 234 } else if (key.equals(firstAcquirerKey)) { 235 header.setFirstAcquirer(msg.getBooleanProperty(key)); 236 } else if (key.startsWith("JMSXDeliveryCount")) { 237 // The AMQP delivery-count field only includes prior failed delivery attempts, 238 // whereas JMSXDeliveryCount includes the first/current delivery attempt. 239 int amqpDeliveryCount = msg.getIntProperty(key) - 1; 240 if (amqpDeliveryCount > 0) { 241 header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); 242 } 243 } else if (key.startsWith("JMSXUserID")) { 244 String value = msg.getStringProperty(key); 245 props.setUserId(new Binary(value.getBytes("UTF-8"))); 246 } else if (key.startsWith("JMSXGroupID")) { 247 String value = msg.getStringProperty(key); 248 props.setGroupId(value); 249 if (apMap == null) { 250 apMap = new HashMap(); 251 } 252 apMap.put(key, value); 253 } else if (key.startsWith("JMSXGroupSeq")) { 254 UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key)); 255 props.setGroupSequence(value); 256 if (apMap == null) { 257 apMap = new HashMap(); 258 } 259 apMap.put(key, value); 260 } else if (key.startsWith(prefixDeliveryAnnotationsKey)) { 261 if (daMap == null) { 262 daMap = new HashMap<Symbol, Object>(); 263 } 264 String name = key.substring(prefixDeliveryAnnotationsKey.length()); 265 daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key)); 266 } else if (key.startsWith(prefixMessageAnnotationsKey)) { 267 if (maMap == null) { 268 maMap = new HashMap<Symbol, Object>(); 269 } 270 String name = key.substring(prefixMessageAnnotationsKey.length()); 271 maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key)); 272 } else if (key.equals(contentTypeKey)) { 273 props.setContentType(Symbol.getSymbol(msg.getStringProperty(key))); 274 } else if (key.equals(contentEncodingKey)) { 275 props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key))); 276 } else if (key.equals(replyToGroupIDKey)) { 277 props.setReplyToGroupId(msg.getStringProperty(key)); 278 } else if (key.startsWith(prefixFooterKey)) { 279 if (footerMap == null) { 280 footerMap = new HashMap(); 281 } 282 String name = key.substring(prefixFooterKey.length()); 283 footerMap.put(name, msg.getObjectProperty(key)); 284 } else { 285 if (apMap == null) { 286 apMap = new HashMap(); 287 } 288 apMap.put(key, msg.getObjectProperty(key)); 289 } 290 } 291 292 MessageAnnotations ma = null; 293 if (maMap != null) { 294 ma = new MessageAnnotations(maMap); 295 } 296 DeliveryAnnotations da = null; 297 if (daMap != null) { 298 da = new DeliveryAnnotations(daMap); 299 } 300 ApplicationProperties ap = null; 301 if (apMap != null) { 302 ap = new ApplicationProperties(apMap); 303 } 304 Footer footer = null; 305 if (footerMap != null) { 306 footer = new Footer(footerMap); 307 } 308 309 return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer); 310 } 311 312 private static byte destinationType(Destination destination) { 313 if (destination instanceof Queue) { 314 if (destination instanceof TemporaryQueue) { 315 return TEMP_QUEUE_TYPE; 316 } else { 317 return QUEUE_TYPE; 318 } 319 } else if (destination instanceof Topic) { 320 if (destination instanceof TemporaryTopic) { 321 return TEMP_TOPIC_TYPE; 322 } else { 323 return TOPIC_TYPE; 324 } 325 } 326 327 throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer."); 328 } 329 330 // Used by legacy QPid AMQP 1.0 JMS client. 331 @Deprecated 332 private static String destinationAttributes(Destination destination) { 333 if (destination instanceof Queue) { 334 if (destination instanceof TemporaryQueue) { 335 return LEGACY_TEMP_QUEUE_TYPE; 336 } else { 337 return LEGACY_QUEUE_TYPE; 338 } 339 } else if (destination instanceof Topic) { 340 if (destination instanceof TemporaryTopic) { 341 return LEGACY_TEMP_TOPIC_TYPE; 342 } else { 343 return LEGACY_TOPIC_TYPE; 344 } 345 } 346 347 throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer."); 348 } 349}