001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.message;
018
019import java.nio.ByteBuffer;
020
021import javax.jms.BytesMessage;
022import javax.jms.JMSException;
023import javax.jms.Message;
024import javax.jms.MessageFormatException;
025
026import org.apache.qpid.proton.amqp.UnsignedInteger;
027import org.apache.qpid.proton.amqp.messaging.Header;
028import org.apache.qpid.proton.codec.CompositeWritableBuffer;
029import org.apache.qpid.proton.codec.DroppingWritableBuffer;
030import org.apache.qpid.proton.codec.WritableBuffer;
031import org.apache.qpid.proton.message.ProtonJMessage;
032
033public class AMQPNativeOutboundTransformer extends OutboundTransformer {
034
035    public AMQPNativeOutboundTransformer(JMSVendor vendor) {
036        super(vendor);
037    }
038
039    @Override
040    public EncodedMessage transform(Message msg) throws Exception {
041        if (msg == null || !(msg instanceof BytesMessage)) {
042            return null;
043        }
044
045        try {
046            if (!msg.getBooleanProperty(prefixVendor + "NATIVE")) {
047                return null;
048            }
049        } catch (MessageFormatException e) {
050            return null;
051        }
052
053        return transform(this, (BytesMessage) msg);
054    }
055
056    static EncodedMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
057        long messageFormat;
058        try {
059            messageFormat = msg.getLongProperty(options.prefixVendor + "MESSAGE_FORMAT");
060        } catch (MessageFormatException e) {
061            return null;
062        }
063        byte data[] = new byte[(int) msg.getBodyLength()];
064        int dataSize = data.length;
065        msg.readBytes(data);
066        msg.reset();
067
068        try {
069            int count = msg.getIntProperty("JMSXDeliveryCount");
070            if (count > 1) {
071
072                // decode...
073                ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
074                int offset = 0;
075                int len = data.length;
076                while (len > 0) {
077                    final int decoded = amqp.decode(data, offset, len);
078                    assert decoded > 0 : "Make progress decoding the message";
079                    offset += decoded;
080                    len -= decoded;
081                }
082
083                // Update the DeliveryCount header...
084                // The AMQP delivery-count field only includes prior failed delivery attempts,
085                // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
086                if (amqp.getHeader() == null) {
087                    amqp.setHeader(new Header());
088                }
089
090                amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
091
092                // Re-encode...
093                ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
094                final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
095                int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
096                if (overflow.position() > 0) {
097                    buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
098                    c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
099                }
100                data = buffer.array();
101                dataSize = c;
102            }
103        } catch (JMSException e) {
104        }
105
106        return new EncodedMessage(messageFormat, data, 0, dataSize);
107    }
108}