001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.message; 018 019import java.util.Map; 020import java.util.Set; 021 022import javax.jms.DeliveryMode; 023import javax.jms.JMSException; 024import javax.jms.Message; 025 026import org.apache.activemq.ScheduledMessage; 027import org.apache.qpid.proton.amqp.Binary; 028import org.apache.qpid.proton.amqp.Decimal128; 029import org.apache.qpid.proton.amqp.Decimal32; 030import org.apache.qpid.proton.amqp.Decimal64; 031import org.apache.qpid.proton.amqp.Symbol; 032import org.apache.qpid.proton.amqp.UnsignedByte; 033import org.apache.qpid.proton.amqp.UnsignedInteger; 034import org.apache.qpid.proton.amqp.UnsignedLong; 035import org.apache.qpid.proton.amqp.UnsignedShort; 036import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; 037import org.apache.qpid.proton.amqp.messaging.Footer; 038import org.apache.qpid.proton.amqp.messaging.Header; 039import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; 040import org.apache.qpid.proton.amqp.messaging.Properties; 041 042public abstract class InboundTransformer { 043 044 JMSVendor vendor; 045 046 public static final String TRANSFORMER_NATIVE = "native"; 047 public static final String TRANSFORMER_RAW = "raw"; 048 public static final String TRANSFORMER_JMS = "jms"; 049 050 String prefixVendor = "JMS_AMQP_"; 051 String prefixDeliveryAnnotations = "DA_"; 052 String prefixMessageAnnotations = "MA_"; 053 String prefixFooter = "FT_"; 054 055 int defaultDeliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT; 056 int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY; 057 long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE; 058 059 public InboundTransformer(JMSVendor vendor) { 060 this.vendor = vendor; 061 } 062 063 public abstract Message transform(EncodedMessage amqpMessage) throws Exception; 064 065 public abstract String getTransformerName(); 066 067 public abstract InboundTransformer getFallbackTransformer(); 068 069 public int getDefaultDeliveryMode() { 070 return defaultDeliveryMode; 071 } 072 073 public void setDefaultDeliveryMode(int defaultDeliveryMode) { 074 this.defaultDeliveryMode = defaultDeliveryMode; 075 } 076 077 public int getDefaultPriority() { 078 return defaultPriority; 079 } 080 081 public void setDefaultPriority(int defaultPriority) { 082 this.defaultPriority = defaultPriority; 083 } 084 085 public long getDefaultTtl() { 086 return defaultTtl; 087 } 088 089 public void setDefaultTtl(long defaultTtl) { 090 this.defaultTtl = defaultTtl; 091 } 092 093 public String getPrefixVendor() { 094 return prefixVendor; 095 } 096 097 public void setPrefixVendor(String prefixVendor) { 098 this.prefixVendor = prefixVendor; 099 } 100 101 public JMSVendor getVendor() { 102 return vendor; 103 } 104 105 public void setVendor(JMSVendor vendor) { 106 this.vendor = vendor; 107 } 108 109 @SuppressWarnings("unchecked") 110 protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception { 111 Header header = amqp.getHeader(); 112 if (header == null) { 113 header = new Header(); 114 } 115 116 if (header.getDurable() != null) { 117 jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); 118 } else { 119 jms.setJMSDeliveryMode(defaultDeliveryMode); 120 } 121 122 if (header.getPriority() != null) { 123 jms.setJMSPriority(header.getPriority().intValue()); 124 } else { 125 jms.setJMSPriority(defaultPriority); 126 } 127 128 if (header.getFirstAcquirer() != null) { 129 jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer()); 130 } 131 132 if (header.getDeliveryCount() != null) { 133 vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue()); 134 } 135 136 final MessageAnnotations ma = amqp.getMessageAnnotations(); 137 if (ma != null) { 138 for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) { 139 String key = entry.getKey().toString(); 140 if ("x-opt-jms-type".equals(key) && entry.getValue() != null) { 141 // Legacy annotation, JMSType value will be replaced by Subject further down if also present. 142 jms.setJMSType(entry.getValue().toString()); 143 } else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { 144 long deliveryTime = ((Number) entry.getValue()).longValue(); 145 long delay = deliveryTime - System.currentTimeMillis(); 146 if (delay > 0) { 147 jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); 148 } 149 } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { 150 long delay = ((Number) entry.getValue()).longValue(); 151 if (delay > 0) { 152 jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); 153 } 154 } else if ("x-opt-delivery-repeat".equals(key) && entry.getValue() != null) { 155 int repeat = ((Number) entry.getValue()).intValue(); 156 if (repeat > 0) { 157 jms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); 158 } 159 } else if ("x-opt-delivery-period".equals(key) && entry.getValue() != null) { 160 long period = ((Number) entry.getValue()).longValue(); 161 if (period > 0) { 162 jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); 163 } 164 } else if ("x-opt-delivery-cron".equals(key) && entry.getValue() != null) { 165 String cronEntry = (String) entry.getValue(); 166 if (cronEntry != null) { 167 jms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cronEntry); 168 } 169 } 170 171 setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); 172 } 173 } 174 175 final ApplicationProperties ap = amqp.getApplicationProperties(); 176 if (ap != null) { 177 for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) { 178 String key = entry.getKey().toString(); 179 if ("JMSXGroupID".equals(key)) { 180 vendor.setJMSXGroupID(jms, entry.getValue().toString()); 181 } else if ("JMSXGroupSequence".equals(key)) { 182 vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue()); 183 } else if ("JMSXUserID".equals(key)) { 184 vendor.setJMSXUserID(jms, entry.getValue().toString()); 185 } else { 186 setProperty(jms, key, entry.getValue()); 187 } 188 } 189 } 190 191 final Properties properties = amqp.getProperties(); 192 if (properties != null) { 193 if (properties.getMessageId() != null) { 194 jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId())); 195 } 196 Binary userId = properties.getUserId(); 197 if (userId != null) { 198 vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8")); 199 } 200 if (properties.getTo() != null) { 201 jms.setJMSDestination(vendor.createDestination(properties.getTo())); 202 } 203 if (properties.getSubject() != null) { 204 jms.setJMSType(properties.getSubject()); 205 } 206 if (properties.getReplyTo() != null) { 207 jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo())); 208 } 209 if (properties.getCorrelationId() != null) { 210 jms.setJMSCorrelationID(properties.getCorrelationId().toString()); 211 } 212 if (properties.getContentType() != null) { 213 jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString()); 214 } 215 if (properties.getContentEncoding() != null) { 216 jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString()); 217 } 218 if (properties.getCreationTime() != null) { 219 jms.setJMSTimestamp(properties.getCreationTime().getTime()); 220 } 221 if (properties.getGroupId() != null) { 222 vendor.setJMSXGroupID(jms, properties.getGroupId()); 223 } 224 if (properties.getGroupSequence() != null) { 225 vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue()); 226 } 227 if (properties.getReplyToGroupId() != null) { 228 jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId()); 229 } 230 if (properties.getAbsoluteExpiryTime() != null) { 231 jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime()); 232 } 233 } 234 235 // If the jms expiration has not yet been set... 236 if (jms.getJMSExpiration() == 0) { 237 // Then lets try to set it based on the message ttl. 238 long ttl = defaultTtl; 239 if (header.getTtl() != null) { 240 ttl = header.getTtl().longValue(); 241 } 242 243 if (ttl == 0) { 244 jms.setJMSExpiration(0); 245 } else { 246 jms.setJMSExpiration(System.currentTimeMillis() + ttl); 247 } 248 } 249 250 final Footer fp = amqp.getFooter(); 251 if (fp != null) { 252 for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) { 253 String key = entry.getKey().toString(); 254 setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue()); 255 } 256 } 257 } 258 259 private void setProperty(Message msg, String key, Object value) throws JMSException { 260 if (value instanceof UnsignedLong) { 261 long v = ((UnsignedLong) value).longValue(); 262 msg.setLongProperty(key, v); 263 } else if (value instanceof UnsignedInteger) { 264 long v = ((UnsignedInteger) value).longValue(); 265 if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) { 266 msg.setIntProperty(key, (int) v); 267 } else { 268 msg.setLongProperty(key, v); 269 } 270 } else if (value instanceof UnsignedShort) { 271 int v = ((UnsignedShort) value).intValue(); 272 if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) { 273 msg.setShortProperty(key, (short) v); 274 } else { 275 msg.setIntProperty(key, v); 276 } 277 } else if (value instanceof UnsignedByte) { 278 short v = ((UnsignedByte) value).shortValue(); 279 if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) { 280 msg.setByteProperty(key, (byte) v); 281 } else { 282 msg.setShortProperty(key, v); 283 } 284 } else if (value instanceof Symbol) { 285 msg.setStringProperty(key, value.toString()); 286 } else if (value instanceof Decimal128) { 287 msg.setDoubleProperty(key, ((Decimal128) value).doubleValue()); 288 } else if (value instanceof Decimal64) { 289 msg.setDoubleProperty(key, ((Decimal64) value).doubleValue()); 290 } else if (value instanceof Decimal32) { 291 msg.setFloatProperty(key, ((Decimal32) value).floatValue()); 292 } else if (value instanceof Binary) { 293 msg.setStringProperty(key, value.toString()); 294 } else { 295 msg.setObjectProperty(key, value); 296 } 297 } 298}