001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.message;
018
019import java.io.UnsupportedEncodingException;
020import java.nio.ByteBuffer;
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.Enumeration;
024import java.util.HashMap;
025
026import javax.jms.BytesMessage;
027import javax.jms.DeliveryMode;
028import javax.jms.Destination;
029import javax.jms.JMSException;
030import javax.jms.MapMessage;
031import javax.jms.Message;
032import javax.jms.MessageEOFException;
033import javax.jms.MessageFormatException;
034import javax.jms.ObjectMessage;
035import javax.jms.Queue;
036import javax.jms.StreamMessage;
037import javax.jms.TemporaryQueue;
038import javax.jms.TemporaryTopic;
039import javax.jms.TextMessage;
040import javax.jms.Topic;
041
042import org.apache.activemq.command.ActiveMQMessage;
043import org.apache.activemq.command.MessageId;
044import org.apache.activemq.transport.amqp.AmqpProtocolException;
045import org.apache.qpid.proton.amqp.Binary;
046import org.apache.qpid.proton.amqp.Symbol;
047import org.apache.qpid.proton.amqp.UnsignedByte;
048import org.apache.qpid.proton.amqp.UnsignedInteger;
049import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
050import org.apache.qpid.proton.amqp.messaging.AmqpValue;
051import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
052import org.apache.qpid.proton.amqp.messaging.Data;
053import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
054import org.apache.qpid.proton.amqp.messaging.Footer;
055import org.apache.qpid.proton.amqp.messaging.Header;
056import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
057import org.apache.qpid.proton.amqp.messaging.Properties;
058import org.apache.qpid.proton.amqp.messaging.Section;
059import org.apache.qpid.proton.codec.CompositeWritableBuffer;
060import org.apache.qpid.proton.codec.DroppingWritableBuffer;
061import org.apache.qpid.proton.codec.WritableBuffer;
062import org.apache.qpid.proton.message.ProtonJMessage;
063
064public class JMSMappingOutboundTransformer extends OutboundTransformer {
065
066    public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest");
067    public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to");
068
069    public static final byte QUEUE_TYPE = 0x00;
070    public static final byte TOPIC_TYPE = 0x01;
071    public static final byte TEMP_QUEUE_TYPE = 0x02;
072    public static final byte TEMP_TOPIC_TYPE = 0x03;
073
074    // Deprecated legacy values used by old QPid AMQP 1.0 JMS client.
075
076    public static final Symbol LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-to-type");
077    public static final Symbol LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-reply-type");
078
079    public static final String LEGACY_QUEUE_TYPE = "queue";
080    public static final String LEGACY_TOPIC_TYPE = "topic";
081    public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue";
082    public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic";
083
084    public JMSMappingOutboundTransformer(JMSVendor vendor) {
085        super(vendor);
086    }
087
088    @Override
089    public EncodedMessage transform(Message msg) throws Exception {
090        if (msg == null) {
091            return null;
092        }
093
094        try {
095            if (msg.getBooleanProperty(prefixVendor + "NATIVE")) {
096                return null;
097            }
098        } catch (MessageFormatException e) {
099            return null;
100        }
101        ProtonJMessage amqp = convert(msg);
102
103        long messageFormat;
104        try {
105            messageFormat = msg.getLongProperty(this.messageFormatKey);
106        } catch (MessageFormatException e) {
107            return null;
108        }
109
110        ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
111        final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
112        int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
113        if (overflow.position() > 0) {
114            buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
115            c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
116        }
117
118        return new EncodedMessage(messageFormat, buffer.array(), 0, c);
119    }
120
121    /**
122     * Perform the conversion between JMS Message and Proton Message without
123     * re-encoding it to array. This is needed because some frameworks may elect
124     * to do this on their own way (Netty for instance using Nettybuffers)
125     *
126     * @param msg
127     * @return
128     * @throws Exception
129     */
130    public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException {
131        Header header = new Header();
132        Properties props = new Properties();
133        HashMap<Symbol, Object> daMap = null;
134        HashMap<Symbol, Object> maMap = null;
135        HashMap apMap = null;
136        Section body = null;
137        HashMap footerMap = null;
138        if (msg instanceof BytesMessage) {
139            BytesMessage m = (BytesMessage) msg;
140            byte data[] = new byte[(int) m.getBodyLength()];
141            m.readBytes(data);
142            m.reset(); // Need to reset after readBytes or future readBytes
143                       // calls (ex: redeliveries) will fail and return -1
144            body = new Data(new Binary(data));
145        }
146        if (msg instanceof TextMessage) {
147            body = new AmqpValue(((TextMessage) msg).getText());
148        }
149        if (msg instanceof MapMessage) {
150            final HashMap<String, Object> map = new HashMap<String, Object>();
151            final MapMessage m = (MapMessage) msg;
152            final Enumeration<String> names = m.getMapNames();
153            while (names.hasMoreElements()) {
154                String key = names.nextElement();
155                map.put(key, m.getObject(key));
156            }
157            body = new AmqpValue(map);
158        }
159        if (msg instanceof StreamMessage) {
160            ArrayList<Object> list = new ArrayList<Object>();
161            final StreamMessage m = (StreamMessage) msg;
162            try {
163                while (true) {
164                    list.add(m.readObject());
165                }
166            } catch (MessageEOFException e) {
167            }
168            body = new AmqpSequence(list);
169        }
170        if (msg instanceof ObjectMessage) {
171            body = new AmqpValue(((ObjectMessage) msg).getObject());
172        }
173
174        header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
175        header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
176        if (msg.getJMSType() != null) {
177            props.setSubject(msg.getJMSType());
178        }
179        if (msg.getJMSMessageID() != null) {
180            ActiveMQMessage amqMsg = (ActiveMQMessage) msg;
181
182            MessageId msgId = amqMsg.getMessageId();
183            if (msgId.getTextView() != null) {
184                try {
185                    props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId.getTextView()));
186                } catch (AmqpProtocolException e) {
187                    props.setMessageId(msgId.getTextView().toString());
188                }
189            } else {
190                props.setMessageId(msgId.toString());
191            }
192        }
193        if (msg.getJMSDestination() != null) {
194            props.setTo(vendor.toAddress(msg.getJMSDestination()));
195            if (maMap == null) {
196                maMap = new HashMap<Symbol, Object>();
197            }
198            maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSDestination()));
199
200            // Deprecated: used by legacy QPid AMQP 1.0 JMS client
201            maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSDestination()));
202        }
203        if (msg.getJMSReplyTo() != null) {
204            props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo()));
205            if (maMap == null) {
206                maMap = new HashMap<Symbol, Object>();
207            }
208            maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSReplyTo()));
209
210            // Deprecated: used by legacy QPid AMQP 1.0 JMS client
211            maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo()));
212        }
213        if (msg.getJMSCorrelationID() != null) {
214            props.setCorrelationId(msg.getJMSCorrelationID());
215        }
216        if (msg.getJMSExpiration() != 0) {
217            long ttl = msg.getJMSExpiration() - System.currentTimeMillis();
218            if (ttl < 0) {
219                ttl = 1;
220            }
221            header.setTtl(new UnsignedInteger((int) ttl));
222
223            props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
224        }
225        if (msg.getJMSTimestamp() != 0) {
226            props.setCreationTime(new Date(msg.getJMSTimestamp()));
227        }
228
229        final Enumeration<String> keys = msg.getPropertyNames();
230        while (keys.hasMoreElements()) {
231            String key = keys.nextElement();
232            if (key.equals(messageFormatKey) || key.equals(nativeKey)) {
233                // skip..
234            } else if (key.equals(firstAcquirerKey)) {
235                header.setFirstAcquirer(msg.getBooleanProperty(key));
236            } else if (key.startsWith("JMSXDeliveryCount")) {
237                // The AMQP delivery-count field only includes prior failed delivery attempts,
238                // whereas JMSXDeliveryCount includes the first/current delivery attempt.
239                int amqpDeliveryCount = msg.getIntProperty(key) - 1;
240                if (amqpDeliveryCount > 0) {
241                    header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
242                }
243            } else if (key.startsWith("JMSXUserID")) {
244                String value = msg.getStringProperty(key);
245                props.setUserId(new Binary(value.getBytes("UTF-8")));
246            } else if (key.startsWith("JMSXGroupID")) {
247                String value = msg.getStringProperty(key);
248                props.setGroupId(value);
249                if (apMap == null) {
250                    apMap = new HashMap();
251                }
252                apMap.put(key, value);
253            } else if (key.startsWith("JMSXGroupSeq")) {
254                UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key));
255                props.setGroupSequence(value);
256                if (apMap == null) {
257                    apMap = new HashMap();
258                }
259                apMap.put(key, value);
260            } else if (key.startsWith(prefixDeliveryAnnotationsKey)) {
261                if (daMap == null) {
262                    daMap = new HashMap<Symbol, Object>();
263                }
264                String name = key.substring(prefixDeliveryAnnotationsKey.length());
265                daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
266            } else if (key.startsWith(prefixMessageAnnotationsKey)) {
267                if (maMap == null) {
268                    maMap = new HashMap<Symbol, Object>();
269                }
270                String name = key.substring(prefixMessageAnnotationsKey.length());
271                maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
272            } else if (key.equals(contentTypeKey)) {
273                props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
274            } else if (key.equals(contentEncodingKey)) {
275                props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
276            } else if (key.equals(replyToGroupIDKey)) {
277                props.setReplyToGroupId(msg.getStringProperty(key));
278            } else if (key.startsWith(prefixFooterKey)) {
279                if (footerMap == null) {
280                    footerMap = new HashMap();
281                }
282                String name = key.substring(prefixFooterKey.length());
283                footerMap.put(name, msg.getObjectProperty(key));
284            } else {
285                if (apMap == null) {
286                    apMap = new HashMap();
287                }
288                apMap.put(key, msg.getObjectProperty(key));
289            }
290        }
291
292        MessageAnnotations ma = null;
293        if (maMap != null) {
294            ma = new MessageAnnotations(maMap);
295        }
296        DeliveryAnnotations da = null;
297        if (daMap != null) {
298            da = new DeliveryAnnotations(daMap);
299        }
300        ApplicationProperties ap = null;
301        if (apMap != null) {
302            ap = new ApplicationProperties(apMap);
303        }
304        Footer footer = null;
305        if (footerMap != null) {
306            footer = new Footer(footerMap);
307        }
308
309        return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
310    }
311
312    private static byte destinationType(Destination destination) {
313        if (destination instanceof Queue) {
314            if (destination instanceof TemporaryQueue) {
315                return TEMP_QUEUE_TYPE;
316            } else {
317                return QUEUE_TYPE;
318            }
319        } else if (destination instanceof Topic) {
320            if (destination instanceof TemporaryTopic) {
321                return TEMP_TOPIC_TYPE;
322            } else {
323                return TOPIC_TYPE;
324            }
325        }
326
327        throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
328    }
329
330    // Used by legacy QPid AMQP 1.0 JMS client.
331    @Deprecated
332    private static String destinationAttributes(Destination destination) {
333        if (destination instanceof Queue) {
334            if (destination instanceof TemporaryQueue) {
335                return LEGACY_TEMP_QUEUE_TYPE;
336            } else {
337                return LEGACY_QUEUE_TYPE;
338            }
339        } else if (destination instanceof Topic) {
340            if (destination instanceof TemporaryTopic) {
341                return LEGACY_TEMP_TOPIC_TYPE;
342            } else {
343                return LEGACY_TOPIC_TYPE;
344            }
345        }
346
347        throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
348    }
349}