001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.io.File;
020import java.io.Serializable;
021import java.math.BigDecimal;
022import java.math.BigInteger;
023import java.util.Date;
024import java.util.LinkedHashMap;
025import java.util.Map;
026
027import org.apache.camel.Exchange;
028import org.apache.camel.RuntimeExchangeException;
029import org.apache.camel.WrappedFile;
030import org.apache.camel.util.ObjectHelper;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Holder object for sending an exchange over a remote wire as a serialized object.
036 * This is usually configured using the <tt>transferExchange=true</tt> option on the endpoint.
037 * <br/>
038 * <b>Note:</b> Message body of type {@link File} or {@link WrappedFile} is <b>not</b> supported and
039 * a {@link RuntimeExchangeException} is thrown.
040 * <br/>
041 * As opposed to normal usage where only the body part of the exchange is transferred over the wire,
042 * this holder object serializes the following fields over the wire:
043 * <ul>
044 * <li>exchangeId</li>
045 * <li>in body</li>
046 * <li>out body</li>
047 * <li>fault body </li>
048 * <li>exchange properties</li>
049 * <li>exception</li>
050 * </ul>
051 * And the following headers is transferred if their values are of primitive types, String or Number based.
052 * <ul>
053 * <li>in headers</li>
054 * <li>out headers</li>
055 * <li>fault headers</li>
056 * </ul>
057 * The body is serialized and stored as serialized bytes. The header and exchange properties only include
058 * primitive, String, and Number types (and Exception types for exchange properties). Any other type is skipped.
059 * <br/>
060 * Any message body object that is not serializable will be skipped and Camel will log this at <tt>WARN</tt> level.
061 * And any message header values that is not a primitive value will be skipped and Camel will log this at <tt>DEBUG</tt> level.
062 *
063 * @version 
064 */
065public class DefaultExchangeHolder implements Serializable {
066
067    private static final long serialVersionUID = 2L;
068    private static final Logger LOG = LoggerFactory.getLogger(DefaultExchangeHolder.class);
069
070    private String exchangeId;
071    private Object inBody;
072    private Object outBody;
073    private Boolean inFaultFlag = Boolean.FALSE;
074    private Boolean outFaultFlag = Boolean.FALSE;
075    private Map<String, Object> inHeaders;
076    private Map<String, Object> outHeaders;
077    private Map<String, Object> properties;
078    private Exception exception;
079
080    /**
081     * Creates a payload object with the information from the given exchange.
082     *
083     * @param exchange the exchange, must <b>not</b> be <tt>null</tt>
084     * @return the holder object with information copied form the exchange
085     */
086    public static DefaultExchangeHolder marshal(Exchange exchange) {
087        return marshal(exchange, true, false);
088    }
089
090    /**
091     * Creates a payload object with the information from the given exchange.
092     *
093     * @param exchange the exchange, must <b>not</b> be <tt>null</tt>
094     * @param includeProperties whether or not to include exchange properties
095     * @return the holder object with information copied form the exchange
096     */
097    public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties) {
098        ObjectHelper.notNull(exchange, "exchange");
099
100        // we do not support files
101        Object body = exchange.getIn().getBody();
102        if (body instanceof WrappedFile || body instanceof File) {
103            throw new RuntimeExchangeException("Message body of type " + body.getClass().getCanonicalName() + " is not supported by this marshaller.", exchange);
104        }
105
106        DefaultExchangeHolder payload = new DefaultExchangeHolder();
107
108        payload.exchangeId = exchange.getExchangeId();
109        payload.inBody = checkSerializableBody("in body", exchange, exchange.getIn().getBody());
110        payload.safeSetInHeaders(exchange, false);
111        if (exchange.hasOut()) {
112            payload.outBody = checkSerializableBody("out body", exchange, exchange.getOut().getBody());
113            payload.outFaultFlag = exchange.getOut().isFault();
114            payload.safeSetOutHeaders(exchange, false);
115        } else {
116            payload.inFaultFlag = exchange.getIn().isFault();
117        }
118        if (includeProperties) {
119            payload.safeSetProperties(exchange, false);
120        }
121        payload.exception = exchange.getException();
122
123        return payload;
124    }
125    
126    /**
127     * Creates a payload object with the information from the given exchange.
128     *
129     * @param exchange the exchange, must <b>not</b> be <tt>null</tt>
130     * @param includeProperties whether or not to include exchange properties
131     * @param allowSerializedHeaders whether or not to include serialized headers
132     * @return the holder object with information copied form the exchange
133     */
134    public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties, boolean allowSerializedHeaders) {
135        ObjectHelper.notNull(exchange, "exchange");
136
137        // we do not support files
138        Object body = exchange.getIn().getBody();
139        if (body instanceof WrappedFile || body instanceof File) {
140            throw new RuntimeExchangeException("Message body of type " + body.getClass().getCanonicalName() + " is not supported by this marshaller.", exchange);
141        }
142
143        DefaultExchangeHolder payload = new DefaultExchangeHolder();
144
145        payload.exchangeId = exchange.getExchangeId();
146        payload.inBody = checkSerializableBody("in body", exchange, exchange.getIn().getBody());
147        payload.safeSetInHeaders(exchange, allowSerializedHeaders);
148        if (exchange.hasOut()) {
149            payload.outBody = checkSerializableBody("out body", exchange, exchange.getOut().getBody());
150            payload.outFaultFlag = exchange.getOut().isFault();
151            payload.safeSetOutHeaders(exchange, allowSerializedHeaders);
152        } else {
153            payload.inFaultFlag = exchange.getIn().isFault();
154        }
155        if (includeProperties) {
156            payload.safeSetProperties(exchange, allowSerializedHeaders);
157        }
158        payload.exception = exchange.getException();
159
160        return payload;
161    }
162
163    /**
164     * Transfers the information from the payload to the exchange.
165     *
166     * @param exchange the exchange to set values from the payload, must <b>not</b> be <tt>null</tt>
167     * @param payload  the payload with the values, must <b>not</b> be <tt>null</tt>
168     */
169    public static void unmarshal(Exchange exchange, DefaultExchangeHolder payload) {
170        ObjectHelper.notNull(exchange, "exchange");
171        ObjectHelper.notNull(payload, "payload");
172
173        exchange.setExchangeId(payload.exchangeId);
174        exchange.getIn().setBody(payload.inBody);
175        if (payload.inHeaders != null) {
176            exchange.getIn().setHeaders(payload.inHeaders);
177        }
178        if (payload.inFaultFlag != null) {
179            exchange.getIn().setFault(payload.inFaultFlag);
180        }
181        if (payload.outBody != null) {
182            exchange.getOut().setBody(payload.outBody);
183            if (payload.outHeaders != null) {
184                exchange.getOut().setHeaders(payload.outHeaders);
185            }
186            if (payload.outFaultFlag != null) {
187                exchange.getOut().setFault(payload.outFaultFlag);
188            }
189        }
190        if (payload.properties != null) {
191            for (String key : payload.properties.keySet()) {
192                exchange.setProperty(key, payload.properties.get(key));
193            }
194        }
195        exchange.setException(payload.exception);
196    }
197
198    /**
199     * Adds a property to the payload.
200     * <p/>
201     * This can be done in special situations where additional information must be added which was not provided
202     * from the source.
203     *
204     * @param payload the serialized payload
205     * @param key the property key to add
206     * @param property the property value to add
207     */
208    public static void addProperty(DefaultExchangeHolder payload, String key, Serializable property) {
209        if (key == null || property == null) {
210            return;
211        }
212        if (payload.properties == null) {
213            payload.properties = new LinkedHashMap<String, Object>();
214        }
215        payload.properties.put(key, property);
216    }
217
218    public String toString() {
219        StringBuilder sb = new StringBuilder("DefaultExchangeHolder[exchangeId=").append(exchangeId);
220        sb.append("inBody=").append(inBody).append(", outBody=").append(outBody);
221        sb.append(", inHeaders=").append(inHeaders).append(", outHeaders=").append(outHeaders);
222        sb.append(", properties=").append(properties).append(", exception=").append(exception);
223        return sb.append(']').toString();
224    }
225
226    private Map<String, Object> safeSetInHeaders(Exchange exchange, boolean allowSerializedHeaders) {
227        if (exchange.getIn().hasHeaders()) {
228            Map<String, Object> map = checkValidHeaderObjects("in headers", exchange, exchange.getIn().getHeaders(), allowSerializedHeaders);
229            if (map != null && !map.isEmpty()) {
230                inHeaders = new LinkedHashMap<String, Object>(map);
231            }
232        }
233        return null;
234    }
235
236    private Map<String, Object> safeSetOutHeaders(Exchange exchange, boolean allowSerializedHeaders) {
237        if (exchange.hasOut() && exchange.getOut().hasHeaders()) {
238            Map<String, Object> map = checkValidHeaderObjects("out headers", exchange, exchange.getOut().getHeaders(), allowSerializedHeaders);
239            if (map != null && !map.isEmpty()) {
240                outHeaders = new LinkedHashMap<String, Object>(map);
241            }
242        }
243        return null;
244    }
245
246    private Map<String, Object> safeSetProperties(Exchange exchange, boolean allowSerializedHeaders) {
247        if (exchange.hasProperties()) {
248            Map<String, Object> map = checkValidExchangePropertyObjects("properties", exchange, exchange.getProperties(), allowSerializedHeaders);
249            if (map != null && !map.isEmpty()) {
250                properties = new LinkedHashMap<String, Object>(map);
251            }
252        }
253        return null;
254    }
255
256    private static Object checkSerializableBody(String type, Exchange exchange, Object object) {
257        if (object == null) {
258            return null;
259        }
260
261        Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, object);
262        if (converted != null) {
263            return converted;
264        } else {
265            LOG.warn("Exchange " + type + " containing object: " + object + " of type: " + object.getClass().getCanonicalName() + " cannot be serialized, it will be excluded by the holder.");
266            return null;
267        }
268    }
269
270    private static Map<String, Object> checkValidHeaderObjects(String type, Exchange exchange, Map<String, Object> map, boolean allowSerializedHeaders) {
271        if (map == null) {
272            return null;
273        }
274
275        Map<String, Object> result = new LinkedHashMap<String, Object>();
276        for (Map.Entry<String, Object> entry : map.entrySet()) {
277
278            // silently skip any values which is null
279            if (entry.getValue() == null) {
280                continue;
281            }
282
283            Object value = getValidHeaderValue(entry.getKey(), entry.getValue(), allowSerializedHeaders);
284            if (value != null) {
285                Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, value);
286                if (converted != null) {
287                    result.put(entry.getKey(), converted);
288                } else {
289                    logCannotSerializeObject(type, entry.getKey(), entry.getValue());
290                }
291            } else {
292                logInvalidHeaderValue(type, entry.getKey(), entry.getValue());
293            }
294        }
295
296        return result;
297    }
298
299    private static Map<String, Object> checkValidExchangePropertyObjects(String type, Exchange exchange, Map<String, Object> map, boolean allowSerializedHeaders) {
300        if (map == null) {
301            return null;
302        }
303
304        Map<String, Object> result = new LinkedHashMap<String, Object>();
305        for (Map.Entry<String, Object> entry : map.entrySet()) {
306
307            // silently skip any values which is null
308            if (entry.getValue() == null) {
309                continue;
310            }
311
312            Object value = getValidExchangePropertyValue(entry.getKey(), entry.getValue(), allowSerializedHeaders);
313            if (value != null) {
314                Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, value);
315                if (converted != null) {
316                    result.put(entry.getKey(), converted);
317                } else {
318                    logCannotSerializeObject(type, entry.getKey(), entry.getValue());
319                }
320            } else {
321                logInvalidExchangePropertyValue(type, entry.getKey(), entry.getValue());
322            }
323        }
324
325        return result;
326    }
327
328    /**
329     * We only want to store header values of primitive and String related types.
330     * <p/>
331     * This default implementation will allow:
332     * <ul>
333     *   <li>any primitives and their counter Objects (Integer, Double etc.)</li>
334     *   <li>String and any other literals, Character, CharSequence</li>
335     *   <li>Boolean</li>
336     *   <li>Number</li>
337     *   <li>java.util.Date</li>
338     * </ul>
339     * 
340     * We make possible store serialized headers by the boolean field allowSerializedHeaders
341     * 
342     * @param headerName   the header name
343     * @param headerValue  the header value
344     * @param allowSerializedHeaders  the header value
345     * @return  the value to use, <tt>null</tt> to ignore this header
346     */
347    protected static Object getValidHeaderValue(String headerName, Object headerValue, boolean allowSerializedHeaders) {
348        if (headerValue instanceof String) {
349            return headerValue;
350        } else if (headerValue instanceof BigInteger) {
351            return headerValue;
352        } else if (headerValue instanceof BigDecimal) {
353            return headerValue;
354        } else if (headerValue instanceof Number) {
355            return headerValue;
356        } else if (headerValue instanceof Character) {
357            return headerValue;
358        } else if (headerValue instanceof CharSequence) {
359            return headerValue.toString();
360        } else if (headerValue instanceof Boolean) {
361            return headerValue;
362        } else if (headerValue instanceof Date) {
363            return headerValue;
364        } else if (allowSerializedHeaders) {
365            if (headerValue instanceof Serializable) {
366                return headerValue;
367            }
368        }
369        return null;
370    }
371
372    /**
373     * We only want to store exchange property values of primitive and String related types, and
374     * as well any caught exception that Camel routing engine has caught.
375     * <p/>
376     * This default implementation will allow the same values as {@link #getValidHeaderValue(String, Object)}
377     * and in addition any value of type {@link Throwable}.
378     *
379     * @param propertyName   the property name
380     * @param propertyValue  the property value
381     * @return  the value to use, <tt>null</tt> to ignore this header
382     */
383    protected static Object getValidExchangePropertyValue(String propertyName, Object propertyValue, boolean allowSerializedHeaders) {
384        // for exchange properties we also allow exception to be transferred so people can store caught exception
385        if (propertyValue instanceof Throwable) {
386            return propertyValue;
387        }
388        return getValidHeaderValue(propertyName, propertyValue, allowSerializedHeaders);
389    }
390
391    private static void logCannotSerializeObject(String type, String key, Object value) {
392        if (key.startsWith("Camel")) {
393            // log Camel at DEBUG level
394            if (LOG.isDebugEnabled()) {
395                LOG.debug("Exchange {} containing key: {} with object: {} of type: {} cannot be serialized, it will be excluded by the holder.",
396                          new Object[]{type, key, value, ObjectHelper.classCanonicalName(value)});
397            }
398        } else {
399            // log regular at WARN level
400            LOG.warn("Exchange {} containing key: {} with object: {} of type: {} cannot be serialized, it will be excluded by the holder.",
401                     new Object[]{type, key, value, ObjectHelper.classCanonicalName(value)});
402        }
403    }
404
405    private static void logInvalidHeaderValue(String type, String key, Object value) {
406        if (LOG.isDebugEnabled()) {
407            LOG.debug("Exchange {} containing key: {} with object: {} of type: {} is not valid header type, it will be excluded by the holder.",
408                      new Object[]{type, key, value, ObjectHelper.classCanonicalName(value)});
409        }
410    }
411
412    private static void logInvalidExchangePropertyValue(String type, String key, Object value) {
413        if (LOG.isDebugEnabled()) {
414            LOG.debug("Exchange {} containing key: {} with object: {} of type: {} is not valid exchange property type, it will be excluded by the holder.",
415                      new Object[]{type, key, value, ObjectHelper.classCanonicalName(value)});
416        }
417    }
418
419}