001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.stomp; 018 019import java.io.DataOutputStream; 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.Map; 023 024import javax.jms.Destination; 025import javax.jms.JMSException; 026 027import org.apache.activemq.command.ActiveMQBytesMessage; 028import org.apache.activemq.command.ActiveMQDestination; 029import org.apache.activemq.command.ActiveMQMessage; 030import org.apache.activemq.command.ActiveMQTextMessage; 031import org.apache.activemq.util.ByteArrayOutputStream; 032import org.apache.activemq.util.ByteSequence; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * Implements ActiveMQ 4.0 translations 038 */ 039public class LegacyFrameTranslator implements FrameTranslator { 040 041 private static final Logger LOG = LoggerFactory.getLogger(LegacyFrameTranslator.class); 042 043 @Override 044 public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException { 045 final Map<?, ?> headers = command.getHeaders(); 046 final ActiveMQMessage msg; 047 /* 048 * To reduce the complexity of this method perhaps a Chain of Responsibility 049 * would be a better implementation 050 */ 051 if (headers.containsKey(Stomp.Headers.AMQ_MESSAGE_TYPE)) { 052 String intendedType = (String)headers.get(Stomp.Headers.AMQ_MESSAGE_TYPE); 053 if(intendedType.equalsIgnoreCase("text")){ 054 ActiveMQTextMessage text = new ActiveMQTextMessage(); 055 try { 056 ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4); 057 DataOutputStream data = new DataOutputStream(bytes); 058 data.writeInt(command.getContent().length); 059 data.write(command.getContent()); 060 text.setContent(bytes.toByteSequence()); 061 data.close(); 062 } catch (Throwable e) { 063 throw new ProtocolException("Text could not bet set: " + e, false, e); 064 } 065 msg = text; 066 } else if(intendedType.equalsIgnoreCase("bytes")) { 067 ActiveMQBytesMessage byteMessage = new ActiveMQBytesMessage(); 068 byteMessage.writeBytes(command.getContent()); 069 msg = byteMessage; 070 } else { 071 throw new ProtocolException("Unsupported message type '"+intendedType+"'",false); 072 } 073 }else if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) { 074 headers.remove(Stomp.Headers.CONTENT_LENGTH); 075 ActiveMQBytesMessage bm = new ActiveMQBytesMessage(); 076 bm.writeBytes(command.getContent()); 077 msg = bm; 078 } else { 079 ActiveMQTextMessage text = new ActiveMQTextMessage(); 080 try { 081 ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4); 082 DataOutputStream data = new DataOutputStream(bytes); 083 data.writeInt(command.getContent().length); 084 data.write(command.getContent()); 085 text.setContent(bytes.toByteSequence()); 086 data.close(); 087 } catch (Throwable e) { 088 throw new ProtocolException("Text could not bet set: " + e, false, e); 089 } 090 msg = text; 091 } 092 FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this); 093 return msg; 094 } 095 096 @Override 097 public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException { 098 StompFrame command = new StompFrame(); 099 command.setAction(Stomp.Responses.MESSAGE); 100 Map<String, String> headers = new HashMap<String, String>(25); 101 command.setHeaders(headers); 102 103 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this); 104 105 if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { 106 107 if (!message.isCompressed() && message.getContent() != null) { 108 ByteSequence msgContent = message.getContent(); 109 if (msgContent.getLength() > 4) { 110 byte[] content = new byte[msgContent.getLength() - 4]; 111 System.arraycopy(msgContent.data, 4, content, 0, content.length); 112 command.setContent(content); 113 } 114 } else { 115 ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy(); 116 String messageText = msg.getText(); 117 if (messageText != null) { 118 command.setContent(msg.getText().getBytes("UTF-8")); 119 } 120 } 121 122 } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { 123 124 ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy(); 125 msg.setReadOnlyBody(true); 126 byte[] data = new byte[(int)msg.getBodyLength()]; 127 msg.readBytes(data); 128 129 headers.put(Stomp.Headers.CONTENT_LENGTH, Integer.toString(data.length)); 130 command.setContent(data); 131 } 132 133 return command; 134 } 135 136 @Override 137 public String convertDestination(ProtocolConverter converter, Destination d) { 138 if (d == null) { 139 return null; 140 } 141 ActiveMQDestination activeMQDestination = (ActiveMQDestination)d; 142 String physicalName = activeMQDestination.getPhysicalName(); 143 144 String rc = converter.getCreatedTempDestinationName(activeMQDestination); 145 if( rc!=null ) { 146 return rc; 147 } 148 149 StringBuilder buffer = new StringBuilder(); 150 if (activeMQDestination.isQueue()) { 151 if (activeMQDestination.isTemporary()) { 152 buffer.append("/remote-temp-queue/"); 153 } else { 154 buffer.append("/queue/"); 155 } 156 } else { 157 if (activeMQDestination.isTemporary()) { 158 buffer.append("/remote-temp-topic/"); 159 } else { 160 buffer.append("/topic/"); 161 } 162 } 163 buffer.append(physicalName); 164 return buffer.toString(); 165 } 166 167 @Override 168 public ActiveMQDestination convertDestination(ProtocolConverter converter, String name, boolean forceFallback) throws ProtocolException { 169 if (name == null) { 170 return null; 171 } 172 173 // in case of space padding by a client we trim for the initial detection, on fallback use 174 // the un-trimmed value. 175 String originalName = name; 176 name = name.trim(); 177 178 String[] destinations = name.split(","); 179 if (destinations == null || destinations.length == 0) { 180 destinations = new String[] { name }; 181 } 182 183 StringBuilder destinationBuilder = new StringBuilder(); 184 for (int i = 0; i < destinations.length; ++i) { 185 String destinationName = destinations[i]; 186 187 if (destinationName.startsWith("/queue/")) { 188 destinationName = destinationName.substring("/queue/".length(), destinationName.length()); 189 destinationBuilder.append(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + destinationName); 190 } else if (destinationName.startsWith("/topic/")) { 191 destinationName = destinationName.substring("/topic/".length(), destinationName.length()); 192 destinationBuilder.append(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + destinationName); 193 } else if (destinationName.startsWith("/remote-temp-queue/")) { 194 destinationName = destinationName.substring("/remote-temp-queue/".length(), destinationName.length()); 195 destinationBuilder.append(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX + destinationName); 196 } else if (destinationName.startsWith("/remote-temp-topic/")) { 197 destinationName = destinationName.substring("/remote-temp-topic/".length(), destinationName.length()); 198 destinationBuilder.append(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX + destinationName); 199 } else if (destinationName.startsWith("/temp-queue/")) { 200 ActiveMQDestination converted = converter.createTempDestination(destinationName, false); 201 destinationBuilder.append(converted.getQualifiedName()); 202 } else if (destinationName.startsWith("/temp-topic/")) { 203 ActiveMQDestination converted = converter.createTempDestination(destinationName, true); 204 destinationBuilder.append(converted.getQualifiedName()); 205 } else { 206 if (forceFallback) { 207 String fallbackName = destinationName; 208 if (destinationName.length() == 1) { 209 // Use the original non-trimmed name instead 210 fallbackName = originalName; 211 } 212 213 try { 214 ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(fallbackName); 215 if (fallback != null) { 216 destinationBuilder.append(fallback.getQualifiedName()); 217 } 218 } catch (JMSException e) { 219 throw new ProtocolException("Illegal destination name: [" + fallbackName + "] -- ActiveMQ STOMP destinations " 220 + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/", false, e); 221 } 222 } else { 223 throw new ProtocolException("Illegal destination name: [" + originalName + "] -- ActiveMQ STOMP destinations " 224 + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/"); 225 } 226 } 227 228 if (i < destinations.length - 1) { 229 destinationBuilder.append(","); 230 } 231 } 232 233 LOG.trace("New Composite Destination name: {}", destinationBuilder); 234 235 return ActiveMQDestination.createDestination(destinationBuilder.toString(), ActiveMQDestination.QUEUE_TYPE); 236 } 237}