001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import org.apache.camel.AsyncCallback;
020import org.apache.camel.AsyncProcessor;
021import org.apache.camel.Exchange;
022import org.apache.camel.Message;
023import org.apache.camel.impl.DefaultMessage;
024import org.apache.camel.spi.IdAware;
025import org.apache.camel.support.ServiceSupport;
026import org.apache.camel.util.AsyncProcessorHelper;
027import org.apache.camel.util.ExchangeHelper;
028import org.apache.camel.util.IOHelper;
029import org.apache.camel.util.ObjectHelper;
030
031/**
032 * A processor which converts the payload of the input message to be of the given type
033 * <p/>
034 * If the conversion fails an {@link org.apache.camel.InvalidPayloadException} is thrown.
035 *
036 * @version 
037 */
038public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcessor, IdAware {
039    private String id;
040    private final Class<?> type;
041    private final String charset;
042
043    public ConvertBodyProcessor(Class<?> type) {
044        ObjectHelper.notNull(type, "type", this);
045        this.type = type;
046        this.charset = null;
047    }
048
049    public ConvertBodyProcessor(Class<?> type, String charset) {
050        ObjectHelper.notNull(type, "type", this);
051        this.type = type;
052        this.charset = IOHelper.normalizeCharset(charset);
053    }
054
055    @Override
056    public String toString() {
057        return "convertBodyTo[" + type.getCanonicalName() + "]";
058    }
059
060    public String getId() {
061        return id;
062    }
063
064    public void setId(String id) {
065        this.id = id;
066    }
067
068    public void process(Exchange exchange) throws Exception {
069        AsyncProcessorHelper.process(this, exchange);
070    }
071
072    @Override
073    public boolean process(Exchange exchange, AsyncCallback callback) {
074        boolean out = exchange.hasOut();
075        Message old = out ? exchange.getOut() : exchange.getIn();
076
077        if (old.getBody() == null) {
078            // only convert if the is a body
079            callback.done(true);
080            return true;
081        }
082
083        if (charset != null) {
084            // override existing charset with configured charset as that is what the user
085            // have explicit configured and expects to be used
086            exchange.setProperty(Exchange.CHARSET_NAME, charset);
087        }
088        // use mandatory conversion
089        Object value;
090        try {
091            value = old.getMandatoryBody(type);
092        } catch (Throwable e) {
093            exchange.setException(e);
094            callback.done(true);
095            return true;
096        }
097
098        // create a new message container so we do not drag specialized message objects along
099        // but that is only needed if the old message is a specialized message
100        boolean copyNeeded = !(old.getClass().equals(DefaultMessage.class));
101
102        if (copyNeeded) {
103            Message msg = new DefaultMessage();
104            msg.copyFrom(old);
105            msg.setBody(value);
106
107            // replace message on exchange
108            ExchangeHelper.replaceMessage(exchange, msg, false);
109        } else {
110            // no copy needed so set replace value directly
111            old.setBody(value);
112        }
113
114        // remove charset when we are done as we should not propagate that,
115        // as that can lead to double converting later on
116        if (charset != null) {
117            exchange.removeProperty(Exchange.CHARSET_NAME);
118        }
119
120        callback.done(true);
121        return true;
122    }
123
124    public Class<?> getType() {
125        return type;
126    }
127
128    public String getCharset() {
129        return charset;
130    }
131
132    @Override
133    protected void doStart() throws Exception {
134        // noop
135    }
136
137    @Override
138    protected void doStop() throws Exception {
139        // noop
140    }
141}