001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.message; 018 019import java.io.Serializable; 020import java.util.List; 021import java.util.Map; 022import java.util.Set; 023 024import javax.jms.BytesMessage; 025import javax.jms.MapMessage; 026import javax.jms.Message; 027import javax.jms.ObjectMessage; 028import javax.jms.StreamMessage; 029import javax.jms.TextMessage; 030 031import org.apache.qpid.proton.amqp.Binary; 032import org.apache.qpid.proton.amqp.messaging.AmqpSequence; 033import org.apache.qpid.proton.amqp.messaging.AmqpValue; 034import org.apache.qpid.proton.amqp.messaging.Data; 035import org.apache.qpid.proton.amqp.messaging.Section; 036 037public class JMSMappingInboundTransformer extends InboundTransformer { 038 039 public JMSMappingInboundTransformer(JMSVendor vendor) { 040 super(vendor); 041 } 042 043 @Override 044 public String getTransformerName() { 045 return TRANSFORMER_JMS; 046 } 047 048 @Override 049 public InboundTransformer getFallbackTransformer() { 050 return new AMQPNativeInboundTransformer(getVendor()); 051 } 052 053 @SuppressWarnings({ "unchecked" }) 054 @Override 055 public Message transform(EncodedMessage amqpMessage) throws Exception { 056 org.apache.qpid.proton.message.Message amqp = amqpMessage.decode(); 057 058 Message rc; 059 final Section body = amqp.getBody(); 060 if (body == null) { 061 rc = vendor.createMessage(); 062 } else if (body instanceof Data) { 063 Binary d = ((Data) body).getValue(); 064 BytesMessage m = vendor.createBytesMessage(); 065 m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength()); 066 rc = m; 067 } else if (body instanceof AmqpSequence) { 068 AmqpSequence sequence = (AmqpSequence) body; 069 StreamMessage m = vendor.createStreamMessage(); 070 for (Object item : sequence.getValue()) { 071 m.writeObject(item); 072 } 073 rc = m; 074 } else if (body instanceof AmqpValue) { 075 Object value = ((AmqpValue) body).getValue(); 076 if (value == null) { 077 rc = vendor.createObjectMessage(); 078 } 079 if (value instanceof String) { 080 TextMessage m = vendor.createTextMessage(); 081 m.setText((String) value); 082 rc = m; 083 } else if (value instanceof Binary) { 084 Binary d = (Binary) value; 085 BytesMessage m = vendor.createBytesMessage(); 086 m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength()); 087 rc = m; 088 } else if (value instanceof List) { 089 StreamMessage m = vendor.createStreamMessage(); 090 for (Object item : (List<Object>) value) { 091 m.writeObject(item); 092 } 093 rc = m; 094 } else if (value instanceof Map) { 095 MapMessage m = vendor.createMapMessage(); 096 final Set<Map.Entry<String, Object>> set = ((Map<String, Object>) value).entrySet(); 097 for (Map.Entry<String, Object> entry : set) { 098 m.setObject(entry.getKey(), entry.getValue()); 099 } 100 rc = m; 101 } else { 102 ObjectMessage m = vendor.createObjectMessage(); 103 m.setObject((Serializable) value); 104 rc = m; 105 } 106 } else { 107 throw new RuntimeException("Unexpected body type: " + body.getClass()); 108 } 109 rc.setJMSDeliveryMode(defaultDeliveryMode); 110 rc.setJMSPriority(defaultPriority); 111 rc.setJMSExpiration(defaultTtl); 112 113 populateMessage(rc, amqp); 114 115 rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat()); 116 rc.setBooleanProperty(prefixVendor + "NATIVE", false); 117 return rc; 118 } 119}