001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.io.File; 020import java.io.Serializable; 021import java.math.BigDecimal; 022import java.math.BigInteger; 023import java.util.Date; 024import java.util.LinkedHashMap; 025import java.util.Map; 026 027import org.apache.camel.Exchange; 028import org.apache.camel.RuntimeExchangeException; 029import org.apache.camel.WrappedFile; 030import org.apache.camel.util.ObjectHelper; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * Holder object for sending an exchange over a remote wire as a serialized object. 036 * This is usually configured using the <tt>transferExchange=true</tt> option on the endpoint. 037 * <br/> 038 * <b>Note:</b> Message body of type {@link File} or {@link WrappedFile} is <b>not</b> supported and 039 * a {@link RuntimeExchangeException} is thrown. 040 * <br/> 041 * As opposed to normal usage where only the body part of the exchange is transferred over the wire, 042 * this holder object serializes the following fields over the wire: 043 * <ul> 044 * <li>exchangeId</li> 045 * <li>in body</li> 046 * <li>out body</li> 047 * <li>fault body </li> 048 * <li>exchange properties</li> 049 * <li>exception</li> 050 * </ul> 051 * And the following headers is transferred if their values are of primitive types, String or Number based. 052 * <ul> 053 * <li>in headers</li> 054 * <li>out headers</li> 055 * <li>fault headers</li> 056 * </ul> 057 * The body is serialized and stored as serialized bytes. The header and exchange properties only include 058 * primitive, String, and Number types (and Exception types for exchange properties). Any other type is skipped. 059 * <br/> 060 * Any message body object that is not serializable will be skipped and Camel will log this at <tt>WARN</tt> level. 061 * And any message header values that is not a primitive value will be skipped and Camel will log this at <tt>DEBUG</tt> level. 062 * 063 * @version 064 */ 065public class DefaultExchangeHolder implements Serializable { 066 067 private static final long serialVersionUID = 2L; 068 private static final Logger LOG = LoggerFactory.getLogger(DefaultExchangeHolder.class); 069 070 private String exchangeId; 071 private Object inBody; 072 private Object outBody; 073 private Boolean inFaultFlag = Boolean.FALSE; 074 private Boolean outFaultFlag = Boolean.FALSE; 075 private Map<String, Object> inHeaders; 076 private Map<String, Object> outHeaders; 077 private Map<String, Object> properties; 078 private Exception exception; 079 080 /** 081 * Creates a payload object with the information from the given exchange. 082 * 083 * @param exchange the exchange, must <b>not</b> be <tt>null</tt> 084 * @return the holder object with information copied form the exchange 085 */ 086 public static DefaultExchangeHolder marshal(Exchange exchange) { 087 return marshal(exchange, true, false); 088 } 089 090 /** 091 * Creates a payload object with the information from the given exchange. 092 * 093 * @param exchange the exchange, must <b>not</b> be <tt>null</tt> 094 * @param includeProperties whether or not to include exchange properties 095 * @return the holder object with information copied form the exchange 096 */ 097 public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties) { 098 ObjectHelper.notNull(exchange, "exchange"); 099 100 // we do not support files 101 Object body = exchange.getIn().getBody(); 102 if (body instanceof WrappedFile || body instanceof File) { 103 throw new RuntimeExchangeException("Message body of type " + body.getClass().getCanonicalName() + " is not supported by this marshaller.", exchange); 104 } 105 106 DefaultExchangeHolder payload = new DefaultExchangeHolder(); 107 108 payload.exchangeId = exchange.getExchangeId(); 109 payload.inBody = checkSerializableBody("in body", exchange, exchange.getIn().getBody()); 110 payload.safeSetInHeaders(exchange, false); 111 if (exchange.hasOut()) { 112 payload.outBody = checkSerializableBody("out body", exchange, exchange.getOut().getBody()); 113 payload.outFaultFlag = exchange.getOut().isFault(); 114 payload.safeSetOutHeaders(exchange, false); 115 } else { 116 payload.inFaultFlag = exchange.getIn().isFault(); 117 } 118 if (includeProperties) { 119 payload.safeSetProperties(exchange, false); 120 } 121 payload.exception = exchange.getException(); 122 123 return payload; 124 } 125 126 /** 127 * Creates a payload object with the information from the given exchange. 128 * 129 * @param exchange the exchange, must <b>not</b> be <tt>null</tt> 130 * @param includeProperties whether or not to include exchange properties 131 * @param allowSerializedHeaders whether or not to include serialized headers 132 * @return the holder object with information copied form the exchange 133 */ 134 public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties, boolean allowSerializedHeaders) { 135 ObjectHelper.notNull(exchange, "exchange"); 136 137 // we do not support files 138 Object body = exchange.getIn().getBody(); 139 if (body instanceof WrappedFile || body instanceof File) { 140 throw new RuntimeExchangeException("Message body of type " + body.getClass().getCanonicalName() + " is not supported by this marshaller.", exchange); 141 } 142 143 DefaultExchangeHolder payload = new DefaultExchangeHolder(); 144 145 payload.exchangeId = exchange.getExchangeId(); 146 payload.inBody = checkSerializableBody("in body", exchange, exchange.getIn().getBody()); 147 payload.safeSetInHeaders(exchange, allowSerializedHeaders); 148 if (exchange.hasOut()) { 149 payload.outBody = checkSerializableBody("out body", exchange, exchange.getOut().getBody()); 150 payload.outFaultFlag = exchange.getOut().isFault(); 151 payload.safeSetOutHeaders(exchange, allowSerializedHeaders); 152 } else { 153 payload.inFaultFlag = exchange.getIn().isFault(); 154 } 155 if (includeProperties) { 156 payload.safeSetProperties(exchange, allowSerializedHeaders); 157 } 158 payload.exception = exchange.getException(); 159 160 return payload; 161 } 162 163 /** 164 * Transfers the information from the payload to the exchange. 165 * 166 * @param exchange the exchange to set values from the payload, must <b>not</b> be <tt>null</tt> 167 * @param payload the payload with the values, must <b>not</b> be <tt>null</tt> 168 */ 169 public static void unmarshal(Exchange exchange, DefaultExchangeHolder payload) { 170 ObjectHelper.notNull(exchange, "exchange"); 171 ObjectHelper.notNull(payload, "payload"); 172 173 exchange.setExchangeId(payload.exchangeId); 174 exchange.getIn().setBody(payload.inBody); 175 if (payload.inHeaders != null) { 176 exchange.getIn().setHeaders(payload.inHeaders); 177 } 178 if (payload.inFaultFlag != null) { 179 exchange.getIn().setFault(payload.inFaultFlag); 180 } 181 if (payload.outBody != null) { 182 exchange.getOut().setBody(payload.outBody); 183 if (payload.outHeaders != null) { 184 exchange.getOut().setHeaders(payload.outHeaders); 185 } 186 if (payload.outFaultFlag != null) { 187 exchange.getOut().setFault(payload.outFaultFlag); 188 } 189 } 190 if (payload.properties != null) { 191 for (String key : payload.properties.keySet()) { 192 exchange.setProperty(key, payload.properties.get(key)); 193 } 194 } 195 exchange.setException(payload.exception); 196 } 197 198 /** 199 * Adds a property to the payload. 200 * <p/> 201 * This can be done in special situations where additional information must be added which was not provided 202 * from the source. 203 * 204 * @param payload the serialized payload 205 * @param key the property key to add 206 * @param property the property value to add 207 */ 208 public static void addProperty(DefaultExchangeHolder payload, String key, Serializable property) { 209 if (key == null || property == null) { 210 return; 211 } 212 if (payload.properties == null) { 213 payload.properties = new LinkedHashMap<String, Object>(); 214 } 215 payload.properties.put(key, property); 216 } 217 218 public String toString() { 219 StringBuilder sb = new StringBuilder("DefaultExchangeHolder[exchangeId=").append(exchangeId); 220 sb.append("inBody=").append(inBody).append(", outBody=").append(outBody); 221 sb.append(", inHeaders=").append(inHeaders).append(", outHeaders=").append(outHeaders); 222 sb.append(", properties=").append(properties).append(", exception=").append(exception); 223 return sb.append(']').toString(); 224 } 225 226 private Map<String, Object> safeSetInHeaders(Exchange exchange, boolean allowSerializedHeaders) { 227 if (exchange.getIn().hasHeaders()) { 228 Map<String, Object> map = checkValidHeaderObjects("in headers", exchange, exchange.getIn().getHeaders(), allowSerializedHeaders); 229 if (map != null && !map.isEmpty()) { 230 inHeaders = new LinkedHashMap<String, Object>(map); 231 } 232 } 233 return null; 234 } 235 236 private Map<String, Object> safeSetOutHeaders(Exchange exchange, boolean allowSerializedHeaders) { 237 if (exchange.hasOut() && exchange.getOut().hasHeaders()) { 238 Map<String, Object> map = checkValidHeaderObjects("out headers", exchange, exchange.getOut().getHeaders(), allowSerializedHeaders); 239 if (map != null && !map.isEmpty()) { 240 outHeaders = new LinkedHashMap<String, Object>(map); 241 } 242 } 243 return null; 244 } 245 246 private Map<String, Object> safeSetProperties(Exchange exchange, boolean allowSerializedHeaders) { 247 if (exchange.hasProperties()) { 248 Map<String, Object> map = checkValidExchangePropertyObjects("properties", exchange, exchange.getProperties(), allowSerializedHeaders); 249 if (map != null && !map.isEmpty()) { 250 properties = new LinkedHashMap<String, Object>(map); 251 } 252 } 253 return null; 254 } 255 256 private static Object checkSerializableBody(String type, Exchange exchange, Object object) { 257 if (object == null) { 258 return null; 259 } 260 261 Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, object); 262 if (converted != null) { 263 return converted; 264 } else { 265 LOG.warn("Exchange " + type + " containing object: " + object + " of type: " + object.getClass().getCanonicalName() + " cannot be serialized, it will be excluded by the holder."); 266 return null; 267 } 268 } 269 270 private static Map<String, Object> checkValidHeaderObjects(String type, Exchange exchange, Map<String, Object> map, boolean allowSerializedHeaders) { 271 if (map == null) { 272 return null; 273 } 274 275 Map<String, Object> result = new LinkedHashMap<String, Object>(); 276 for (Map.Entry<String, Object> entry : map.entrySet()) { 277 278 // silently skip any values which is null 279 if (entry.getValue() == null) { 280 continue; 281 } 282 283 Object value = getValidHeaderValue(entry.getKey(), entry.getValue(), allowSerializedHeaders); 284 if (value != null) { 285 Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, value); 286 if (converted != null) { 287 result.put(entry.getKey(), converted); 288 } else { 289 logCannotSerializeObject(type, entry.getKey(), entry.getValue()); 290 } 291 } else { 292 logInvalidHeaderValue(type, entry.getKey(), entry.getValue()); 293 } 294 } 295 296 return result; 297 } 298 299 private static Map<String, Object> checkValidExchangePropertyObjects(String type, Exchange exchange, Map<String, Object> map, boolean allowSerializedHeaders) { 300 if (map == null) { 301 return null; 302 } 303 304 Map<String, Object> result = new LinkedHashMap<String, Object>(); 305 for (Map.Entry<String, Object> entry : map.entrySet()) { 306 307 // silently skip any values which is null 308 if (entry.getValue() == null) { 309 continue; 310 } 311 312 Object value = getValidExchangePropertyValue(entry.getKey(), entry.getValue(), allowSerializedHeaders); 313 if (value != null) { 314 Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, value); 315 if (converted != null) { 316 result.put(entry.getKey(), converted); 317 } else { 318 logCannotSerializeObject(type, entry.getKey(), entry.getValue()); 319 } 320 } else { 321 logInvalidExchangePropertyValue(type, entry.getKey(), entry.getValue()); 322 } 323 } 324 325 return result; 326 } 327 328 /** 329 * We only want to store header values of primitive and String related types. 330 * <p/> 331 * This default implementation will allow: 332 * <ul> 333 * <li>any primitives and their counter Objects (Integer, Double etc.)</li> 334 * <li>String and any other literals, Character, CharSequence</li> 335 * <li>Boolean</li> 336 * <li>Number</li> 337 * <li>java.util.Date</li> 338 * </ul> 339 * 340 * We make possible store serialized headers by the boolean field allowSerializedHeaders 341 * 342 * @param headerName the header name 343 * @param headerValue the header value 344 * @param allowSerializedHeaders the header value 345 * @return the value to use, <tt>null</tt> to ignore this header 346 */ 347 protected static Object getValidHeaderValue(String headerName, Object headerValue, boolean allowSerializedHeaders) { 348 if (headerValue instanceof String) { 349 return headerValue; 350 } else if (headerValue instanceof BigInteger) { 351 return headerValue; 352 } else if (headerValue instanceof BigDecimal) { 353 return headerValue; 354 } else if (headerValue instanceof Number) { 355 return headerValue; 356 } else if (headerValue instanceof Character) { 357 return headerValue; 358 } else if (headerValue instanceof CharSequence) { 359 return headerValue.toString(); 360 } else if (headerValue instanceof Boolean) { 361 return headerValue; 362 } else if (headerValue instanceof Date) { 363 return headerValue; 364 } else if (allowSerializedHeaders) { 365 if (headerValue instanceof Serializable) { 366 return headerValue; 367 } 368 } 369 return null; 370 } 371 372 /** 373 * We only want to store exchange property values of primitive and String related types, and 374 * as well any caught exception that Camel routing engine has caught. 375 * <p/> 376 * This default implementation will allow the same values as {@link #getValidHeaderValue(String, Object)} 377 * and in addition any value of type {@link Throwable}. 378 * 379 * @param propertyName the property name 380 * @param propertyValue the property value 381 * @return the value to use, <tt>null</tt> to ignore this header 382 */ 383 protected static Object getValidExchangePropertyValue(String propertyName, Object propertyValue, boolean allowSerializedHeaders) { 384 // for exchange properties we also allow exception to be transferred so people can store caught exception 385 if (propertyValue instanceof Throwable) { 386 return propertyValue; 387 } 388 return getValidHeaderValue(propertyName, propertyValue, allowSerializedHeaders); 389 } 390 391 private static void logCannotSerializeObject(String type, String key, Object value) { 392 if (key.startsWith("Camel")) { 393 // log Camel at DEBUG level 394 if (LOG.isDebugEnabled()) { 395 LOG.debug("Exchange {} containing key: {} with object: {} of type: {} cannot be serialized, it will be excluded by the holder.", 396 new Object[]{type, key, value, ObjectHelper.classCanonicalName(value)}); 397 } 398 } else { 399 // log regular at WARN level 400 LOG.warn("Exchange {} containing key: {} with object: {} of type: {} cannot be serialized, it will be excluded by the holder.", 401 new Object[]{type, key, value, ObjectHelper.classCanonicalName(value)}); 402 } 403 } 404 405 private static void logInvalidHeaderValue(String type, String key, Object value) { 406 if (LOG.isDebugEnabled()) { 407 LOG.debug("Exchange {} containing key: {} with object: {} of type: {} is not valid header type, it will be excluded by the holder.", 408 new Object[]{type, key, value, ObjectHelper.classCanonicalName(value)}); 409 } 410 } 411 412 private static void logInvalidExchangePropertyValue(String type, String key, Object value) { 413 if (LOG.isDebugEnabled()) { 414 LOG.debug("Exchange {} containing key: {} with object: {} of type: {} is not valid exchange property type, it will be excluded by the holder.", 415 new Object[]{type, key, value, ObjectHelper.classCanonicalName(value)}); 416 } 417 } 418 419}