001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.message;
018
019import java.util.Map;
020import java.util.Set;
021
022import javax.jms.DeliveryMode;
023import javax.jms.JMSException;
024import javax.jms.Message;
025
026import org.apache.activemq.ScheduledMessage;
027import org.apache.qpid.proton.amqp.Binary;
028import org.apache.qpid.proton.amqp.Decimal128;
029import org.apache.qpid.proton.amqp.Decimal32;
030import org.apache.qpid.proton.amqp.Decimal64;
031import org.apache.qpid.proton.amqp.Symbol;
032import org.apache.qpid.proton.amqp.UnsignedByte;
033import org.apache.qpid.proton.amqp.UnsignedInteger;
034import org.apache.qpid.proton.amqp.UnsignedLong;
035import org.apache.qpid.proton.amqp.UnsignedShort;
036import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
037import org.apache.qpid.proton.amqp.messaging.Footer;
038import org.apache.qpid.proton.amqp.messaging.Header;
039import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
040import org.apache.qpid.proton.amqp.messaging.Properties;
041
042public abstract class InboundTransformer {
043
044    JMSVendor vendor;
045
046    public static final String TRANSFORMER_NATIVE = "native";
047    public static final String TRANSFORMER_RAW = "raw";
048    public static final String TRANSFORMER_JMS = "jms";
049
050    String prefixVendor = "JMS_AMQP_";
051    String prefixDeliveryAnnotations = "DA_";
052    String prefixMessageAnnotations = "MA_";
053    String prefixFooter = "FT_";
054
055    int defaultDeliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT;
056    int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
057    long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
058
059    public InboundTransformer(JMSVendor vendor) {
060        this.vendor = vendor;
061    }
062
063    public abstract Message transform(EncodedMessage amqpMessage) throws Exception;
064
065    public abstract String getTransformerName();
066
067    public abstract InboundTransformer getFallbackTransformer();
068
069    public int getDefaultDeliveryMode() {
070        return defaultDeliveryMode;
071    }
072
073    public void setDefaultDeliveryMode(int defaultDeliveryMode) {
074        this.defaultDeliveryMode = defaultDeliveryMode;
075    }
076
077    public int getDefaultPriority() {
078        return defaultPriority;
079    }
080
081    public void setDefaultPriority(int defaultPriority) {
082        this.defaultPriority = defaultPriority;
083    }
084
085    public long getDefaultTtl() {
086        return defaultTtl;
087    }
088
089    public void setDefaultTtl(long defaultTtl) {
090        this.defaultTtl = defaultTtl;
091    }
092
093    public String getPrefixVendor() {
094        return prefixVendor;
095    }
096
097    public void setPrefixVendor(String prefixVendor) {
098        this.prefixVendor = prefixVendor;
099    }
100
101    public JMSVendor getVendor() {
102        return vendor;
103    }
104
105    public void setVendor(JMSVendor vendor) {
106        this.vendor = vendor;
107    }
108
109    @SuppressWarnings("unchecked")
110    protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
111        Header header = amqp.getHeader();
112        if (header == null) {
113            header = new Header();
114        }
115
116        if (header.getDurable() != null) {
117            jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
118        } else {
119            jms.setJMSDeliveryMode(defaultDeliveryMode);
120        }
121
122        if (header.getPriority() != null) {
123            jms.setJMSPriority(header.getPriority().intValue());
124        } else {
125            jms.setJMSPriority(defaultPriority);
126        }
127
128        if (header.getFirstAcquirer() != null) {
129            jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
130        }
131
132        if (header.getDeliveryCount() != null) {
133            vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
134        }
135
136        final MessageAnnotations ma = amqp.getMessageAnnotations();
137        if (ma != null) {
138            for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
139                String key = entry.getKey().toString();
140                if ("x-opt-jms-type".equals(key) && entry.getValue() != null) {
141                    // Legacy annotation, JMSType value will be replaced by Subject further down if also present.
142                    jms.setJMSType(entry.getValue().toString());
143                } else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
144                    long deliveryTime = ((Number) entry.getValue()).longValue();
145                    long delay = deliveryTime - System.currentTimeMillis();
146                    if (delay > 0) {
147                        jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
148                    }
149                } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
150                    long delay = ((Number) entry.getValue()).longValue();
151                    if (delay > 0) {
152                        jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
153                    }
154                } else if ("x-opt-delivery-repeat".equals(key) && entry.getValue() != null) {
155                    int repeat = ((Number) entry.getValue()).intValue();
156                    if (repeat > 0) {
157                        jms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
158                    }
159                } else if ("x-opt-delivery-period".equals(key) && entry.getValue() != null) {
160                    long period = ((Number) entry.getValue()).longValue();
161                    if (period > 0) {
162                        jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
163                    }
164                } else if ("x-opt-delivery-cron".equals(key) && entry.getValue() != null) {
165                    String cronEntry = (String) entry.getValue();
166                    if (cronEntry != null) {
167                        jms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cronEntry);
168                    }
169                }
170
171                setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
172            }
173        }
174
175        final ApplicationProperties ap = amqp.getApplicationProperties();
176        if (ap != null) {
177            for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) {
178                String key = entry.getKey().toString();
179                if ("JMSXGroupID".equals(key)) {
180                    vendor.setJMSXGroupID(jms, entry.getValue().toString());
181                } else if ("JMSXGroupSequence".equals(key)) {
182                    vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue());
183                } else if ("JMSXUserID".equals(key)) {
184                    vendor.setJMSXUserID(jms, entry.getValue().toString());
185                } else {
186                    setProperty(jms, key, entry.getValue());
187                }
188            }
189        }
190
191        final Properties properties = amqp.getProperties();
192        if (properties != null) {
193            if (properties.getMessageId() != null) {
194                jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId()));
195            }
196            Binary userId = properties.getUserId();
197            if (userId != null) {
198                vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
199            }
200            if (properties.getTo() != null) {
201                jms.setJMSDestination(vendor.createDestination(properties.getTo()));
202            }
203            if (properties.getSubject() != null) {
204                jms.setJMSType(properties.getSubject());
205            }
206            if (properties.getReplyTo() != null) {
207                jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
208            }
209            if (properties.getCorrelationId() != null) {
210                jms.setJMSCorrelationID(properties.getCorrelationId().toString());
211            }
212            if (properties.getContentType() != null) {
213                jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
214            }
215            if (properties.getContentEncoding() != null) {
216                jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
217            }
218            if (properties.getCreationTime() != null) {
219                jms.setJMSTimestamp(properties.getCreationTime().getTime());
220            }
221            if (properties.getGroupId() != null) {
222                vendor.setJMSXGroupID(jms, properties.getGroupId());
223            }
224            if (properties.getGroupSequence() != null) {
225                vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
226            }
227            if (properties.getReplyToGroupId() != null) {
228                jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
229            }
230            if (properties.getAbsoluteExpiryTime() != null) {
231                jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
232            }
233        }
234
235        // If the jms expiration has not yet been set...
236        if (jms.getJMSExpiration() == 0) {
237            // Then lets try to set it based on the message ttl.
238            long ttl = defaultTtl;
239            if (header.getTtl() != null) {
240                ttl = header.getTtl().longValue();
241            }
242
243            if (ttl == 0) {
244                jms.setJMSExpiration(0);
245            } else {
246                jms.setJMSExpiration(System.currentTimeMillis() + ttl);
247            }
248        }
249
250        final Footer fp = amqp.getFooter();
251        if (fp != null) {
252            for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
253                String key = entry.getKey().toString();
254                setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
255            }
256        }
257    }
258
259    private void setProperty(Message msg, String key, Object value) throws JMSException {
260        if (value instanceof UnsignedLong) {
261            long v = ((UnsignedLong) value).longValue();
262            msg.setLongProperty(key, v);
263        } else if (value instanceof UnsignedInteger) {
264            long v = ((UnsignedInteger) value).longValue();
265            if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) {
266                msg.setIntProperty(key, (int) v);
267            } else {
268                msg.setLongProperty(key, v);
269            }
270        } else if (value instanceof UnsignedShort) {
271            int v = ((UnsignedShort) value).intValue();
272            if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) {
273                msg.setShortProperty(key, (short) v);
274            } else {
275                msg.setIntProperty(key, v);
276            }
277        } else if (value instanceof UnsignedByte) {
278            short v = ((UnsignedByte) value).shortValue();
279            if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) {
280                msg.setByteProperty(key, (byte) v);
281            } else {
282                msg.setShortProperty(key, v);
283            }
284        } else if (value instanceof Symbol) {
285            msg.setStringProperty(key, value.toString());
286        } else if (value instanceof Decimal128) {
287            msg.setDoubleProperty(key, ((Decimal128) value).doubleValue());
288        } else if (value instanceof Decimal64) {
289            msg.setDoubleProperty(key, ((Decimal64) value).doubleValue());
290        } else if (value instanceof Decimal32) {
291            msg.setFloatProperty(key, ((Decimal32) value).floatValue());
292        } else if (value instanceof Binary) {
293            msg.setStringProperty(key, value.toString());
294        } else {
295            msg.setObjectProperty(key, value);
296        }
297    }
298}