001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.message; 018 019import java.nio.ByteBuffer; 020 021import javax.jms.BytesMessage; 022import javax.jms.JMSException; 023import javax.jms.Message; 024import javax.jms.MessageFormatException; 025 026import org.apache.qpid.proton.amqp.UnsignedInteger; 027import org.apache.qpid.proton.amqp.messaging.Header; 028import org.apache.qpid.proton.codec.CompositeWritableBuffer; 029import org.apache.qpid.proton.codec.DroppingWritableBuffer; 030import org.apache.qpid.proton.codec.WritableBuffer; 031import org.apache.qpid.proton.message.ProtonJMessage; 032 033public class AMQPNativeOutboundTransformer extends OutboundTransformer { 034 035 public AMQPNativeOutboundTransformer(JMSVendor vendor) { 036 super(vendor); 037 } 038 039 @Override 040 public EncodedMessage transform(Message msg) throws Exception { 041 if (msg == null || !(msg instanceof BytesMessage)) { 042 return null; 043 } 044 045 try { 046 if (!msg.getBooleanProperty(prefixVendor + "NATIVE")) { 047 return null; 048 } 049 } catch (MessageFormatException e) { 050 return null; 051 } 052 053 return transform(this, (BytesMessage) msg); 054 } 055 056 static EncodedMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException { 057 long messageFormat; 058 try { 059 messageFormat = msg.getLongProperty(options.prefixVendor + "MESSAGE_FORMAT"); 060 } catch (MessageFormatException e) { 061 return null; 062 } 063 byte data[] = new byte[(int) msg.getBodyLength()]; 064 int dataSize = data.length; 065 msg.readBytes(data); 066 msg.reset(); 067 068 try { 069 int count = msg.getIntProperty("JMSXDeliveryCount"); 070 if (count > 1) { 071 072 // decode... 073 ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(); 074 int offset = 0; 075 int len = data.length; 076 while (len > 0) { 077 final int decoded = amqp.decode(data, offset, len); 078 assert decoded > 0 : "Make progress decoding the message"; 079 offset += decoded; 080 len -= decoded; 081 } 082 083 // Update the DeliveryCount header... 084 // The AMQP delivery-count field only includes prior failed delivery attempts, 085 // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1. 086 if (amqp.getHeader() == null) { 087 amqp.setHeader(new Header()); 088 } 089 090 amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1)); 091 092 // Re-encode... 093 ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]); 094 final DroppingWritableBuffer overflow = new DroppingWritableBuffer(); 095 int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow)); 096 if (overflow.position() > 0) { 097 buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]); 098 c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer)); 099 } 100 data = buffer.array(); 101 dataSize = c; 102 } 103 } catch (JMSException e) { 104 } 105 106 return new EncodedMessage(messageFormat, data, 0, dataSize); 107 } 108}