001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.util.HashMap; 025import java.util.zip.DeflaterOutputStream; 026import java.util.zip.InflaterInputStream; 027 028import javax.jms.JMSException; 029import javax.jms.MessageNotWriteableException; 030import javax.jms.TextMessage; 031 032import org.apache.activemq.ActiveMQConnection; 033import org.apache.activemq.util.ByteArrayInputStream; 034import org.apache.activemq.util.ByteArrayOutputStream; 035import org.apache.activemq.util.ByteSequence; 036import org.apache.activemq.util.JMSExceptionSupport; 037import org.apache.activemq.util.MarshallingSupport; 038import org.apache.activemq.wireformat.WireFormat; 039 040/** 041 * @openwire:marshaller code="28" 042 * 043 */ 044public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage { 045 046 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE; 047 048 protected String text; 049 050 @Override 051 public Message copy() { 052 ActiveMQTextMessage copy = new ActiveMQTextMessage(); 053 copy(copy); 054 return copy; 055 } 056 057 private void copy(ActiveMQTextMessage copy) { 058 super.copy(copy); 059 copy.text = text; 060 } 061 062 @Override 063 public byte getDataStructureType() { 064 return DATA_STRUCTURE_TYPE; 065 } 066 067 @Override 068 public String getJMSXMimeType() { 069 return "jms/text-message"; 070 } 071 072 @Override 073 public void setText(String text) throws MessageNotWriteableException { 074 checkReadOnlyBody(); 075 this.text = text; 076 setContent(null); 077 } 078 079 @Override 080 public String getText() throws JMSException { 081 ByteSequence content = getContent(); 082 083 if (text == null && content != null) { 084 text = decodeContent(content); 085 setContent(null); 086 setCompressed(false); 087 } 088 return text; 089 } 090 091 private String decodeContent(ByteSequence bodyAsBytes) throws JMSException { 092 String text = null; 093 if (bodyAsBytes != null) { 094 InputStream is = null; 095 try { 096 is = new ByteArrayInputStream(bodyAsBytes); 097 if (isCompressed()) { 098 is = new InflaterInputStream(is); 099 } 100 DataInputStream dataIn = new DataInputStream(is); 101 text = MarshallingSupport.readUTF8(dataIn); 102 dataIn.close(); 103 } catch (IOException ioe) { 104 throw JMSExceptionSupport.create(ioe); 105 } finally { 106 if (is != null) { 107 try { 108 is.close(); 109 } catch (IOException e) { 110 // ignore 111 } 112 } 113 } 114 } 115 return text; 116 } 117 118 @Override 119 public void beforeMarshall(WireFormat wireFormat) throws IOException { 120 super.beforeMarshall(wireFormat); 121 storeContentAndClear(); 122 } 123 124 @Override 125 public void storeContentAndClear() { 126 storeContent(); 127 text=null; 128 } 129 130 @Override 131 public void storeContent() { 132 try { 133 ByteSequence content = getContent(); 134 String text = this.text; 135 if (content == null && text != null) { 136 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 137 OutputStream os = bytesOut; 138 ActiveMQConnection connection = getConnection(); 139 if (connection != null && connection.isUseCompression()) { 140 compressed = true; 141 os = new DeflaterOutputStream(os); 142 } 143 DataOutputStream dataOut = new DataOutputStream(os); 144 MarshallingSupport.writeUTF8(dataOut, text); 145 dataOut.close(); 146 setContent(bytesOut.toByteSequence()); 147 } 148 } catch (IOException e) { 149 throw new RuntimeException(e); 150 } 151 } 152 153 // see https://issues.apache.org/activemq/browse/AMQ-2103 154 // and https://issues.apache.org/activemq/browse/AMQ-2966 155 @Override 156 public void clearMarshalledState() throws JMSException { 157 super.clearMarshalledState(); 158 this.text = null; 159 } 160 161 /** 162 * Clears out the message body. Clearing a message's body does not clear its 163 * header values or property entries. <p/> 164 * <P> 165 * If this message body was read-only, calling this method leaves the 166 * message body in the same state as an empty body in a newly created 167 * message. 168 * 169 * @throws JMSException if the JMS provider fails to clear the message body 170 * due to some internal error. 171 */ 172 @Override 173 public void clearBody() throws JMSException { 174 super.clearBody(); 175 this.text = null; 176 } 177 178 @Override 179 public int getSize() { 180 String text = this.text; 181 if (size == 0 && content == null && text != null) { 182 size = getMinimumMessageSize(); 183 if (marshalledProperties != null) { 184 size += marshalledProperties.getLength(); 185 } 186 size += text.length() * 2; 187 } 188 return super.getSize(); 189 } 190 191 @Override 192 public String toString() { 193 try { 194 String text = this.text; 195 if( text == null ) { 196 text = decodeContent(getContent()); 197 } 198 if (text != null) { 199 text = MarshallingSupport.truncate64(text); 200 HashMap<String, Object> overrideFields = new HashMap<String, Object>(); 201 overrideFields.put("text", text); 202 return super.toString(overrideFields); 203 } 204 } catch (JMSException e) { 205 } 206 return super.toString(); 207 } 208}