001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.stomp; 018 019import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage; 020import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame; 021 022import java.io.IOException; 023import java.io.Serializable; 024import java.io.StringReader; 025import java.io.StringWriter; 026import java.util.HashMap; 027import java.util.Locale; 028import java.util.Map; 029 030import javax.jms.JMSException; 031 032import org.apache.activemq.advisory.AdvisorySupport; 033import org.apache.activemq.broker.BrokerContext; 034import org.apache.activemq.broker.BrokerContextAware; 035import org.apache.activemq.command.ActiveMQMapMessage; 036import org.apache.activemq.command.ActiveMQMessage; 037import org.apache.activemq.command.ActiveMQObjectMessage; 038import org.apache.activemq.command.DataStructure; 039import org.apache.activemq.transport.stomp.Stomp.Headers; 040import org.apache.activemq.transport.stomp.Stomp.Responses; 041import org.apache.activemq.transport.stomp.Stomp.Transformations; 042import org.codehaus.jettison.mapped.Configuration; 043import org.fusesource.hawtbuf.UTF8Buffer; 044 045import com.thoughtworks.xstream.XStream; 046import com.thoughtworks.xstream.converters.basic.AbstractSingleValueConverter; 047import com.thoughtworks.xstream.io.HierarchicalStreamReader; 048import com.thoughtworks.xstream.io.HierarchicalStreamWriter; 049import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver; 050import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver; 051import com.thoughtworks.xstream.io.xml.PrettyPrintWriter; 052import com.thoughtworks.xstream.io.xml.XppReader; 053import com.thoughtworks.xstream.io.xml.xppdom.XppFactory; 054 055/** 056 * Frame translator implementation that uses XStream to convert messages to and 057 * from XML and JSON 058 */ 059public class JmsFrameTranslator extends LegacyFrameTranslator implements BrokerContextAware { 060 061 XStream xStream = null; 062 BrokerContext brokerContext; 063 064 @Override 065 public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException { 066 Map<String, String> headers = command.getHeaders(); 067 ActiveMQMessage msg; 068 String transformation = headers.get(Headers.TRANSFORMATION); 069 if (headers.containsKey(Headers.CONTENT_LENGTH) || transformation.equals(Transformations.JMS_BYTE.toString())) { 070 msg = super.convertFrame(converter, command); 071 } else { 072 HierarchicalStreamReader in; 073 074 try { 075 String text = new String(command.getContent(), "UTF-8"); 076 switch (Transformations.getValue(transformation)) { 077 case JMS_OBJECT_XML: 078 in = new XppReader(new StringReader(text), XppFactory.createDefaultParser()); 079 msg = createObjectMessage(in); 080 break; 081 case JMS_OBJECT_JSON: 082 in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); 083 msg = createObjectMessage(in); 084 break; 085 case JMS_MAP_XML: 086 in = new XppReader(new StringReader(text), XppFactory.createDefaultParser()); 087 msg = createMapMessage(in); 088 break; 089 case JMS_MAP_JSON: 090 in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); 091 msg = createMapMessage(in); 092 break; 093 default: 094 throw new Exception("Unknown transformation: " + transformation); 095 } 096 } catch (Throwable e) { 097 command.getHeaders().put(Headers.TRANSFORMATION_ERROR, e.getMessage()); 098 msg = super.convertFrame(converter, command); 099 } 100 } 101 102 copyStandardHeadersFromFrameToMessage(converter, command, msg, this); 103 return msg; 104 } 105 106 @Override 107 public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException { 108 109 StompFrame command = new StompFrame(); 110 command.setAction(Responses.MESSAGE); 111 Map<String, String> headers = new HashMap<String, String>(25); 112 command.setHeaders(headers); 113 114 copyStandardHeadersFromMessageToFrame(converter, message, command, this); 115 116 String transformation = headers.get(Headers.TRANSFORMATION); 117 118 if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) { 119 120 if (Transformations.JMS_XML.equals(transformation)) { 121 headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString()); 122 } else if (Transformations.JMS_JSON.equals(transformation)) { 123 headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_JSON.toString()); 124 } 125 126 if (!headers.containsKey(Headers.TRANSFORMATION)) { 127 headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString()); 128 } 129 130 ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy(); 131 command.setContent(marshall(msg.getObject(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8")); 132 133 } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { 134 135 if (Transformations.JMS_XML.equals(transformation)) { 136 headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString()); 137 } else if (Transformations.JMS_JSON.equals(transformation)) { 138 headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_JSON.toString()); 139 } 140 141 if (!headers.containsKey(Headers.TRANSFORMATION)) { 142 headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString()); 143 } 144 145 ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); 146 command.setContent(marshall((Serializable) msg.getContentMap(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8")); 147 148 } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) { 149 150 if (Transformations.JMS_XML.equals(transformation)) { 151 headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_XML.toString()); 152 } else if (Transformations.JMS_JSON.equals(transformation)) { 153 headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString()); 154 } 155 156 if (!headers.containsKey(Headers.TRANSFORMATION)) { 157 headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString()); 158 } 159 160 String body = marshallAdvisory(message.getDataStructure(), headers.get(Headers.TRANSFORMATION)); 161 command.setContent(body.getBytes("UTF-8")); 162 163 } else { 164 command = super.convertMessage(converter, message); 165 } 166 167 return command; 168 } 169 170 /** 171 * Marshal the Object to a string using XML or JSON encoding 172 * 173 * @param object 174 * the object to marshal 175 * @param transformation 176 * the transformation to apply to the object. 177 * 178 * @returns the marshaled form of the given object, in JSON or XML. 179 * 180 * @throws JMSException if an error occurs during the marshal operation. 181 */ 182 protected String marshall(Serializable object, String transformation) throws JMSException { 183 StringWriter buffer = new StringWriter(); 184 HierarchicalStreamWriter out; 185 if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) { 186 out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer); 187 } else { 188 out = new PrettyPrintWriter(buffer); 189 } 190 getXStream().marshal(object, out); 191 return buffer.toString(); 192 } 193 194 protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException { 195 ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage(); 196 Object obj = getXStream().unmarshal(in); 197 objMsg.setObject((Serializable) obj); 198 return objMsg; 199 } 200 201 @SuppressWarnings("unchecked") 202 protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException { 203 ActiveMQMapMessage mapMsg = new ActiveMQMapMessage(); 204 Map<String, Object> map = (Map<String, Object>) getXStream().unmarshal(in); 205 for (String key : map.keySet()) { 206 mapMsg.setObject(key, map.get(key)); 207 } 208 return mapMsg; 209 } 210 211 protected String marshallAdvisory(final DataStructure ds, String transformation) { 212 213 StringWriter buffer = new StringWriter(); 214 HierarchicalStreamWriter out; 215 if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) { 216 out = new JettisonMappedXmlDriver().createWriter(buffer); 217 } else { 218 out = new PrettyPrintWriter(buffer); 219 } 220 221 XStream xstream = getXStream(); 222 xstream.setMode(XStream.NO_REFERENCES); 223 xstream.aliasPackage("", "org.apache.activemq.command"); 224 xstream.marshal(ds, out); 225 return buffer.toString(); 226 } 227 228 // Properties 229 // ------------------------------------------------------------------------- 230 public XStream getXStream() { 231 if (xStream == null) { 232 xStream = createXStream(); 233 } 234 return xStream; 235 } 236 237 public void setXStream(XStream xStream) { 238 this.xStream = xStream; 239 } 240 241 // Implementation methods 242 // ------------------------------------------------------------------------- 243 @SuppressWarnings("unchecked") 244 protected XStream createXStream() { 245 XStream xstream = null; 246 if (brokerContext != null) { 247 Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class); 248 for (XStream bean : beans.values()) { 249 if (bean != null) { 250 xstream = bean; 251 break; 252 } 253 } 254 } 255 256 if (xstream == null) { 257 xstream = XStreamSupport.createXStream(); 258 xstream.ignoreUnknownElements(); 259 } 260 261 // For any object whose elements contains an UTF8Buffer instance instead 262 // of a String type we map it to String both in and out such that we don't 263 // marshal UTF8Buffers out 264 xstream.registerConverter(new AbstractSingleValueConverter() { 265 266 @Override 267 public Object fromString(String str) { 268 return str; 269 } 270 271 @SuppressWarnings("rawtypes") 272 @Override 273 public boolean canConvert(Class type) { 274 return type.equals(UTF8Buffer.class); 275 } 276 }); 277 278 xstream.alias("string", UTF8Buffer.class); 279 280 return xstream; 281 } 282 283 @Override 284 public void setBrokerContext(BrokerContext brokerContext) { 285 this.brokerContext = brokerContext; 286 } 287 288 @Override 289 public BrokerContext getBrokerContext() { 290 return this.brokerContext; 291 } 292 293 /** 294 * Return an Advisory message as a JSON formatted string 295 * 296 * @param ds 297 * the DataStructure instance that is being marshaled. 298 * 299 * @return the JSON marshaled form of the given DataStructure instance. 300 */ 301 protected String marshallAdvisory(final DataStructure ds) { 302 XStream xstream = new XStream(new JsonHierarchicalStreamDriver()); 303 xstream.setMode(XStream.NO_REFERENCES); 304 xstream.aliasPackage("", "org.apache.activemq.command"); 305 return xstream.toXML(ds); 306 } 307}