001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.net.MalformedURLException; 020import java.util.Enumeration; 021 022import javax.jms.*; 023import javax.jms.Message; 024import org.apache.activemq.blob.BlobDownloader; 025import org.apache.activemq.command.*; 026 027/** 028 * A helper class for converting normal JMS interfaces into ActiveMQ specific 029 * ones. 030 * 031 * 032 */ 033public final class ActiveMQMessageTransformation { 034 035 private ActiveMQMessageTransformation() { 036 } 037 038 /** 039 * Creates a an available JMS message from another provider. 040 * 041 * @param destination - Destination to be converted into ActiveMQ's 042 * implementation. 043 * @return ActiveMQDestination - ActiveMQ's implementation of the 044 * destination. 045 * @throws JMSException if an error occurs 046 */ 047 public static ActiveMQDestination transformDestination(Destination destination) throws JMSException { 048 ActiveMQDestination activeMQDestination = null; 049 050 if (destination != null) { 051 if (destination instanceof ActiveMQDestination) { 052 return (ActiveMQDestination)destination; 053 054 } else { 055 if (destination instanceof TemporaryQueue) { 056 activeMQDestination = new ActiveMQTempQueue(((Queue)destination).getQueueName()); 057 } else if (destination instanceof TemporaryTopic) { 058 activeMQDestination = new ActiveMQTempTopic(((Topic)destination).getTopicName()); 059 } else if (destination instanceof Queue) { 060 activeMQDestination = new ActiveMQQueue(((Queue)destination).getQueueName()); 061 } else if (destination instanceof Topic) { 062 activeMQDestination = new ActiveMQTopic(((Topic)destination).getTopicName()); 063 } 064 } 065 } 066 067 return activeMQDestination; 068 } 069 070 /** 071 * Creates a fast shallow copy of the current ActiveMQMessage or creates a 072 * whole new message instance from an available JMS message from another 073 * provider. 074 * 075 * @param message - Message to be converted into ActiveMQ's implementation. 076 * @param connection 077 * @return ActiveMQMessage - ActiveMQ's implementation object of the 078 * message. 079 * @throws JMSException if an error occurs 080 */ 081 public static ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection) 082 throws JMSException { 083 if (message instanceof ActiveMQMessage) { 084 return (ActiveMQMessage)message; 085 086 } else { 087 ActiveMQMessage activeMessage = null; 088 089 if (message instanceof BytesMessage) { 090 BytesMessage bytesMsg = (BytesMessage)message; 091 bytesMsg.reset(); 092 ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); 093 msg.setConnection(connection); 094 try { 095 for (;;) { 096 // Reads a byte from the message stream until the stream 097 // is empty 098 msg.writeByte(bytesMsg.readByte()); 099 } 100 } catch (MessageEOFException e) { 101 // if an end of message stream as expected 102 } catch (JMSException e) { 103 } 104 105 activeMessage = msg; 106 } else if (message instanceof MapMessage) { 107 MapMessage mapMsg = (MapMessage)message; 108 ActiveMQMapMessage msg = new ActiveMQMapMessage(); 109 msg.setConnection(connection); 110 Enumeration iter = mapMsg.getMapNames(); 111 112 while (iter.hasMoreElements()) { 113 String name = iter.nextElement().toString(); 114 msg.setObject(name, mapMsg.getObject(name)); 115 } 116 117 activeMessage = msg; 118 } else if (message instanceof ObjectMessage) { 119 ObjectMessage objMsg = (ObjectMessage)message; 120 ActiveMQObjectMessage msg = new ActiveMQObjectMessage(); 121 msg.setConnection(connection); 122 msg.setObject(objMsg.getObject()); 123 msg.storeContent(); 124 activeMessage = msg; 125 } else if (message instanceof StreamMessage) { 126 StreamMessage streamMessage = (StreamMessage)message; 127 streamMessage.reset(); 128 ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); 129 msg.setConnection(connection); 130 Object obj = null; 131 132 try { 133 while ((obj = streamMessage.readObject()) != null) { 134 msg.writeObject(obj); 135 } 136 } catch (MessageEOFException e) { 137 // if an end of message stream as expected 138 } catch (JMSException e) { 139 } 140 141 activeMessage = msg; 142 } else if (message instanceof TextMessage) { 143 TextMessage textMsg = (TextMessage)message; 144 ActiveMQTextMessage msg = new ActiveMQTextMessage(); 145 msg.setConnection(connection); 146 msg.setText(textMsg.getText()); 147 activeMessage = msg; 148 } else if (message instanceof BlobMessage) { 149 BlobMessage blobMessage = (BlobMessage)message; 150 ActiveMQBlobMessage msg = new ActiveMQBlobMessage(); 151 msg.setConnection(connection); 152 if (connection != null){ 153 msg.setBlobDownloader(new BlobDownloader(connection.getBlobTransferPolicy())); 154 } 155 try { 156 msg.setURL(blobMessage.getURL()); 157 } catch (MalformedURLException e) { 158 159 } 160 activeMessage = msg; 161 } else { 162 activeMessage = new ActiveMQMessage(); 163 activeMessage.setConnection(connection); 164 } 165 166 copyProperties(message, activeMessage); 167 168 return activeMessage; 169 } 170 } 171 172 /** 173 * Copies the standard JMS and user defined properties from the givem 174 * message to the specified message 175 * 176 * @param fromMessage the message to take the properties from 177 * @param toMessage the message to add the properties to 178 * @throws JMSException 179 */ 180 public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException { 181 toMessage.setJMSMessageID(fromMessage.getJMSMessageID()); 182 toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID()); 183 toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo())); 184 toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination())); 185 toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode()); 186 toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered()); 187 toMessage.setJMSType(fromMessage.getJMSType()); 188 toMessage.setJMSExpiration(fromMessage.getJMSExpiration()); 189 toMessage.setJMSPriority(fromMessage.getJMSPriority()); 190 toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp()); 191 192 Enumeration propertyNames = fromMessage.getPropertyNames(); 193 194 while (propertyNames.hasMoreElements()) { 195 String name = propertyNames.nextElement().toString(); 196 Object obj = fromMessage.getObjectProperty(name); 197 toMessage.setObjectProperty(name, obj); 198 } 199 } 200}