001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.xstream;
018
019import java.io.IOException;
020import java.io.Reader;
021import com.thoughtworks.xstream.XStream;
022import org.apache.activemq.command.ConsumerInfo;
023
024import com.thoughtworks.xstream.converters.Converter;
025import com.thoughtworks.xstream.converters.ConverterLookup;
026import com.thoughtworks.xstream.converters.MarshallingContext;
027import com.thoughtworks.xstream.converters.UnmarshallingContext;
028import com.thoughtworks.xstream.io.HierarchicalStreamReader;
029import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
030import org.apache.activemq.command.MarshallAware;
031import org.apache.activemq.command.MessageDispatch;
032import org.apache.activemq.command.TransientInitializer;
033import org.apache.activemq.transport.stomp.XStreamSupport;
034import org.apache.activemq.transport.util.TextWireFormat;
035import org.apache.activemq.util.ByteSequence;
036import org.apache.activemq.wireformat.WireFormat;
037
038/**
039 * A {@link WireFormat} implementation which uses the <a
040 * href="http://xstream.codehaus.org/>XStream</a> library to marshall commands
041 * onto the wire
042 *
043 *
044 */
045public class XStreamWireFormat extends TextWireFormat {
046    private XStream xStream;
047    private int version;
048
049    @Override
050    public int getVersion() {
051        return version;
052    }
053
054    @Override
055    public void setVersion(int version) {
056        this.version = version;
057    }
058
059    public WireFormat copy() {
060        return new XStreamWireFormat();
061    }
062
063    @Override
064    public Object unmarshalText(String text) {
065        return getXStream().fromXML(text);
066    }
067
068    @Override
069    public Object unmarshalText(Reader reader) {
070        Object val = getXStream().fromXML(reader);
071        if (val instanceof TransientInitializer) {
072            ((TransientInitializer)val).initTransients();
073        }
074        return val;
075    }
076
077    @Override
078    public String marshalText(Object command) throws IOException {
079        if (command instanceof MarshallAware) {
080            ((MarshallAware)command).beforeMarshall(this);
081        } else if(command instanceof MessageDispatch) {
082            MessageDispatch dispatch = (MessageDispatch) command;
083            if (dispatch != null && dispatch.getMessage() != null) {
084                dispatch.getMessage().beforeMarshall(this);
085            }
086        }
087
088        return getXStream().toXML(command);
089    }
090
091    /**
092     * Can this wireformat process packets of this version
093     *
094     * @param version the version number to test
095     * @return true if can accept the version
096     */
097    public boolean canProcessWireFormatVersion(int version) {
098        return true;
099    }
100
101    /**
102     * @return the current version of this wire format
103     */
104    public int getCurrentWireFormatVersion() {
105        return 1;
106    }
107
108    // Properties
109    // -------------------------------------------------------------------------
110    public XStream getXStream() {
111        if (xStream == null) {
112            xStream = createXStream();
113            // make it work in OSGi env
114            xStream.setClassLoader(getClass().getClassLoader());
115        }
116        return xStream;
117    }
118
119    public void setXStream(XStream xStream) {
120        this.xStream = xStream;
121    }
122
123    // Implementation methods
124    // -------------------------------------------------------------------------
125    protected XStream createXStream() {
126        final XStream xstream = XStreamSupport.createXStream();
127        xstream.ignoreUnknownElements();
128        xstream.registerConverter(new Converter() {
129            final Converter delegate = xstream.getConverterLookup().lookupConverterForType(ByteSequence.class);
130            @Override
131            public void marshal(Object o, HierarchicalStreamWriter hierarchicalStreamWriter, MarshallingContext marshallingContext) {
132                ByteSequence byteSequence = (ByteSequence)o;
133                byteSequence.compact();
134                delegate.marshal(byteSequence, hierarchicalStreamWriter, marshallingContext);
135            }
136
137            @Override
138            public Object unmarshal(HierarchicalStreamReader hierarchicalStreamReader, UnmarshallingContext unmarshallingContext) {
139                return delegate.unmarshal(hierarchicalStreamReader, unmarshallingContext);
140            }
141
142            @Override
143            public boolean canConvert(Class aClass) {
144                return aClass == ByteSequence.class;
145            }
146        });
147        return xstream;
148    }
149
150}