001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.util;
018
019import java.io.File;
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.OutputStream;
023import java.io.Reader;
024import java.io.Writer;
025import java.util.Date;
026import java.util.List;
027import java.util.Map;
028import java.util.TreeMap;
029
030import javax.xml.transform.Source;
031
032import org.apache.camel.BytesSource;
033import org.apache.camel.Exchange;
034import org.apache.camel.Message;
035import org.apache.camel.MessageHistory;
036import org.apache.camel.StreamCache;
037import org.apache.camel.StringSource;
038import org.apache.camel.WrappedFile;
039import org.apache.camel.spi.ExchangeFormatter;
040import org.apache.camel.spi.HeaderFilterStrategy;
041
042/**
043 * Some helper methods when working with {@link org.apache.camel.Message}.
044 * 
045 * @version
046 */
047public final class MessageHelper {
048
049    private static final String MESSAGE_HISTORY_HEADER = "%-20s %-20s %-80s %-12s";
050    private static final String MESSAGE_HISTORY_OUTPUT = "[%-18.18s] [%-18.18s] [%-78.78s] [%10.10s]";
051
052    /**
053     * Utility classes should not have a public constructor.
054     */
055    private MessageHelper() {
056    }
057
058    /**
059     * Extracts the given body and returns it as a String, that can be used for
060     * logging etc.
061     * <p/>
062     * Will handle stream based bodies wrapped in StreamCache.
063     * 
064     * @param message the message with the body
065     * @return the body as String, can return <tt>null</null> if no body
066     */
067    public static String extractBodyAsString(Message message) {
068        if (message == null) {
069            return null;
070        }
071
072        // optimize if the body is a String type already
073        Object body = message.getBody();
074        if (body instanceof String) {
075            return (String) body;
076        }
077
078        // we need to favor using stream cache so the body can be re-read later
079        StreamCache newBody = message.getBody(StreamCache.class);
080        if (newBody != null) {
081            message.setBody(newBody);
082        }
083
084        Object answer = message.getBody(String.class);
085        if (answer == null) {
086            answer = message.getBody();
087        }
088
089        if (newBody != null) {
090            // Reset the InputStreamCache
091            newBody.reset();
092        }
093
094        return answer != null ? answer.toString() : null;
095    }
096
097    /**
098     * Gets the given body class type name as a String.
099     * <p/>
100     * Will skip java.lang. for the build in Java types.
101     * 
102     * @param message the message with the body
103     * @return the body type name as String, can return
104     *         <tt>null</null> if no body
105     */
106    public static String getBodyTypeName(Message message) {
107        if (message == null) {
108            return null;
109        }
110        String answer = ObjectHelper.classCanonicalName(message.getBody());
111        if (answer != null && answer.startsWith("java.lang.")) {
112            return answer.substring(10);
113        }
114        return answer;
115    }
116
117    /**
118     * If the message body contains a {@link StreamCache} instance, reset the
119     * cache to enable reading from it again.
120     * 
121     * @param message the message for which to reset the body
122     */
123    public static void resetStreamCache(Message message) {
124        if (message == null) {
125            return;
126        }
127        Object body = message.getBody();
128        if (body != null && body instanceof StreamCache) {
129            ((StreamCache) body).reset();
130        }
131    }
132
133    /**
134     * Returns the MIME content type on the message or <tt>null</tt> if none
135     * defined
136     */
137    public static String getContentType(Message message) {
138        return message.getHeader(Exchange.CONTENT_TYPE, String.class);
139    }
140
141    /**
142     * Returns the MIME content encoding on the message or <tt>null</tt> if none
143     * defined
144     */
145    public static String getContentEncoding(Message message) {
146        return message.getHeader(Exchange.CONTENT_ENCODING, String.class);
147    }
148
149    /**
150     * Extracts the body for logging purpose.
151     * <p/>
152     * Will clip the body if its too big for logging. Will prepend the message
153     * with <tt>Message: </tt>
154     * 
155     * @see org.apache.camel.Exchange#LOG_DEBUG_BODY_STREAMS
156     * @see org.apache.camel.Exchange#LOG_DEBUG_BODY_MAX_CHARS
157     * @param message the message
158     * @return the logging message
159     */
160    public static String extractBodyForLogging(Message message) {
161        return extractBodyForLogging(message, "Message: ");
162    }
163
164    /**
165     * Extracts the value for logging purpose.
166     * <p/>
167     * Will clip the value if its too big for logging.
168     *
169     * @see org.apache.camel.Exchange#LOG_DEBUG_BODY_STREAMS
170     * @see org.apache.camel.Exchange#LOG_DEBUG_BODY_MAX_CHARS
171     * @param value   the value
172     * @param message the message
173     * @return the logging message
174     */
175    public static String extractValueForLogging(Object value, Message message) {
176        boolean streams = false;
177        if (message.getExchange() != null) {
178            String property = message.getExchange().getContext().getProperty(Exchange.LOG_DEBUG_BODY_STREAMS);
179            if (property != null) {
180                streams = message.getExchange().getContext().getTypeConverter().convertTo(Boolean.class, message.getExchange(), property);
181            }
182        }
183
184        // default to 1000 chars
185        int maxChars = 1000;
186
187        if (message.getExchange() != null) {
188            String property = message.getExchange().getContext().getProperty(Exchange.LOG_DEBUG_BODY_MAX_CHARS);
189            if (property != null) {
190                maxChars = message.getExchange().getContext().getTypeConverter().convertTo(Integer.class, property);
191            }
192        }
193
194        return extractValueForLogging(value, message, "", streams, false, maxChars);
195    }
196
197    /**
198     * Extracts the body for logging purpose.
199     * <p/>
200     * Will clip the body if its too big for logging.
201     *
202     * @see org.apache.camel.Exchange#LOG_DEBUG_BODY_STREAMS
203     * @see org.apache.camel.Exchange#LOG_DEBUG_BODY_MAX_CHARS
204     * @param message the message
205     * @param prepend a message to prepend
206     * @return the logging message
207     */
208    public static String extractBodyForLogging(Message message, String prepend) {
209        boolean streams = false;
210        if (message.getExchange() != null) {
211            String property = message.getExchange().getContext().getProperty(Exchange.LOG_DEBUG_BODY_STREAMS);
212            if (property != null) {
213                streams = message.getExchange().getContext().getTypeConverter().convertTo(Boolean.class, message.getExchange(), property);
214            }
215        }
216
217        // default to 1000 chars
218        int maxChars = 1000;
219
220        if (message.getExchange() != null) {
221            String property = message.getExchange().getContext().getProperty(Exchange.LOG_DEBUG_BODY_MAX_CHARS);
222            if (property != null) {
223                maxChars = message.getExchange().getContext().getTypeConverter().convertTo(Integer.class, property);
224            }
225        }
226
227        return extractBodyForLogging(message, prepend, streams, false, maxChars);
228    }
229
230    /**
231     * Extracts the body for logging purpose.
232     * <p/>
233     * Will clip the body if its too big for logging.
234     * 
235     * @see org.apache.camel.Exchange#LOG_DEBUG_BODY_MAX_CHARS
236     * @param message the message
237     * @param prepend a message to prepend
238     * @param allowStreams whether or not streams is allowed
239     * @param allowFiles whether or not files is allowed (currently not in use)
240     * @param maxChars limit to maximum number of chars. Use 0 for not limit, and -1 for turning logging message body off.
241     * @return the logging message
242     */
243    public static String extractBodyForLogging(Message message, String prepend, boolean allowStreams, boolean allowFiles, int maxChars) {
244        return extractValueForLogging(message.getBody(), message, prepend, allowStreams, allowFiles, maxChars);
245    }
246
247    /**
248     * Extracts the value for logging purpose.
249     * <p/>
250     * Will clip the value if its too big for logging.
251     *
252     * @see org.apache.camel.Exchange#LOG_DEBUG_BODY_MAX_CHARS
253     * @param obj     the value
254     * @param message the message
255     * @param prepend a message to prepend
256     * @param allowStreams whether or not streams is allowed
257     * @param allowFiles whether or not files is allowed (currently not in use)
258     * @param maxChars limit to maximum number of chars. Use 0 for not limit, and -1 for turning logging message body off.
259     * @return the logging message
260     */
261    public static String extractValueForLogging(Object obj, Message message, String prepend, boolean allowStreams, boolean allowFiles, int maxChars) {
262        if (maxChars < 0) {
263            return prepend + "[Body is not logged]";
264        }
265
266        if (obj == null) {
267            return prepend + "[Body is null]";
268        }
269
270        if (!allowStreams) {
271            if (obj instanceof Source && !(obj instanceof StringSource || obj instanceof BytesSource)) {
272                // for Source its only StringSource or BytesSource that is okay as they are memory based
273                // all other kinds we should not touch the body
274                return prepend + "[Body is instance of java.xml.transform.Source]";
275            } else if (obj instanceof StreamCache) {
276                return prepend + "[Body is instance of org.apache.camel.StreamCache]";
277            } else if (obj instanceof InputStream) {
278                return prepend + "[Body is instance of java.io.InputStream]";
279            } else if (obj instanceof OutputStream) {
280                return prepend + "[Body is instance of java.io.OutputStream]";
281            } else if (obj instanceof Reader) {
282                return prepend + "[Body is instance of java.io.Reader]";
283            } else if (obj instanceof Writer) {
284                return prepend + "[Body is instance of java.io.Writer]";
285            } else if (obj instanceof WrappedFile || obj instanceof File) {
286                if (!allowFiles) {
287                    return prepend + "[Body is file based: " + obj + "]";
288                }
289            }
290        }
291
292        if (!allowFiles) {
293            if (obj instanceof WrappedFile || obj instanceof File) {
294                return prepend + "[Body is file based: " + obj + "]";
295            }
296        }
297
298        // is the body a stream cache or input stream
299        StreamCache cache = null;
300        InputStream is = null;
301        if (obj instanceof StreamCache) {
302            cache = (StreamCache)obj;
303            is = null;
304        } else if (obj instanceof InputStream) {
305            cache = null;
306            is = (InputStream) obj;
307        }
308
309        // grab the message body as a string
310        String body = null;
311        if (message.getExchange() != null) {
312            try {
313                body = message.getExchange().getContext().getTypeConverter().tryConvertTo(String.class, message.getExchange(), obj);
314            } catch (Throwable e) {
315                // ignore as the body is for logging purpose
316            }
317        }
318        if (body == null) {
319            try {
320                body = obj.toString();
321            } catch (Throwable e) {
322                // ignore as the body is for logging purpose
323            }
324        }
325
326        // reset stream cache after use
327        if (cache != null) {
328            cache.reset();
329        } else if (is != null && is.markSupported()) {
330            try {
331                is.reset();
332            } catch (IOException e) {
333                // ignore
334            }
335        }
336
337        if (body == null) {
338            return prepend + "[Body is null]";
339        }
340
341        // clip body if length enabled and the body is too big
342        if (maxChars > 0 && body.length() > maxChars) {
343            body = body.substring(0, maxChars) + "... [Body clipped after " + maxChars + " chars, total length is " + body.length() + "]";
344        }
345
346        return prepend + body;
347    }
348
349    /**
350     * Dumps the message as a generic XML structure.
351     * 
352     * @param message the message
353     * @return the XML
354     */
355    public static String dumpAsXml(Message message) {
356        return dumpAsXml(message, true);
357    }
358
359    /**
360     * Dumps the message as a generic XML structure.
361     * 
362     * @param message the message
363     * @param includeBody whether or not to include the message body
364     * @return the XML
365     */
366    public static String dumpAsXml(Message message, boolean includeBody) {
367        return dumpAsXml(message, includeBody, 0);
368    }
369
370    /**
371     * Dumps the message as a generic XML structure.
372     *
373     * @param message the message
374     * @param includeBody whether or not to include the message body
375     * @param indent number of spaces to indent
376     * @return the XML
377     */
378    public static String dumpAsXml(Message message, boolean includeBody, int indent) {
379        return dumpAsXml(message, includeBody, indent, false, true, 128 * 1024);
380    }
381
382    /**
383     * Dumps the message as a generic XML structure.
384     *
385     * @param message the message
386     * @param includeBody whether or not to include the message body
387     * @param indent number of spaces to indent
388     * @param allowStreams whether to include message body if they are stream based
389     * @param allowFiles whether to include message body if they are file based
390     * @param maxChars clip body after maximum chars (to avoid very big messages). Use 0 or negative value to not limit at all.
391     * @return the XML
392     */
393    public static String dumpAsXml(Message message, boolean includeBody, int indent, boolean allowStreams, boolean allowFiles, int maxChars) {
394        StringBuilder sb = new StringBuilder();
395
396        StringBuilder prefix = new StringBuilder();
397        for (int i = 0; i < indent; i++) {
398            prefix.append(" ");
399        }
400
401        // include exchangeId as attribute on the <message> tag
402        sb.append(prefix);
403        sb.append("<message exchangeId=\"").append(message.getExchange().getExchangeId()).append("\">\n");
404
405        // headers
406        if (message.hasHeaders()) {
407            sb.append(prefix);
408            sb.append("  <headers>\n");
409            // sort the headers so they are listed A..Z
410            Map<String, Object> headers = new TreeMap<String, Object>(message.getHeaders());
411            for (Map.Entry<String, Object> entry : headers.entrySet()) {
412                Object value = entry.getValue();
413                String type = ObjectHelper.classCanonicalName(value);
414                sb.append(prefix);
415                sb.append("    <header key=\"").append(entry.getKey()).append("\"");
416                if (type != null) {
417                    sb.append(" type=\"").append(type).append("\"");
418                }
419                sb.append(">");
420
421                // dump header value as XML, use Camel type converter to convert
422                // to String
423                if (value != null) {
424                    try {
425                        String xml = message.getExchange().getContext().getTypeConverter().tryConvertTo(String.class,
426                                message.getExchange(), value);
427                        if (xml != null) {
428                            // must always xml encode
429                            sb.append(StringHelper.xmlEncode(xml));
430                        }
431                    } catch (Throwable e) {
432                        // ignore as the body is for logging purpose
433                    }
434                }
435
436                sb.append("</header>\n");
437            }
438            sb.append(prefix);
439            sb.append("  </headers>\n");
440        }
441
442        if (includeBody) {
443            sb.append(prefix);
444            sb.append("  <body");
445            String type = ObjectHelper.classCanonicalName(message.getBody());
446            if (type != null) {
447                sb.append(" type=\"").append(type).append("\"");
448            }
449            sb.append(">");
450
451            String xml = extractBodyForLogging(message, "", allowStreams, allowFiles, maxChars);
452            if (xml != null) {
453                // must always xml encode
454                sb.append(StringHelper.xmlEncode(xml));
455            }
456
457            sb.append("</body>\n");
458        }
459
460        sb.append(prefix);
461        sb.append("</message>");
462        return sb.toString();
463    }
464
465    /**
466     * Copies the headers from the source to the target message.
467     * 
468     * @param source the source message
469     * @param target the target message
470     * @param override whether to override existing headers
471     */
472    public static void copyHeaders(Message source, Message target, boolean override) {
473        copyHeaders(source, target, null, override);
474    }
475    
476    /**
477     * Copies the headers from the source to the target message.
478     * 
479     * @param source the source message
480     * @param target the target message
481     * @param strategy the header filter strategy which could help us to filter the protocol message headers
482     * @param override whether to override existing headers
483     */
484    public static void copyHeaders(Message source, Message target, HeaderFilterStrategy strategy, boolean override) {
485        if (!source.hasHeaders()) {
486            return;
487        }
488
489        for (Map.Entry<String, Object> entry : source.getHeaders().entrySet()) {
490            String key = entry.getKey();
491            Object value = entry.getValue();
492
493            if (target.getHeader(key) == null || override) {
494                if (strategy == null) {
495                    target.setHeader(key, value);
496                } else if (!strategy.applyFilterToExternalHeaders(key, value, target.getExchange())) {
497                    // Just make sure we don't copy the protocol headers to target
498                    target.setHeader(key, value);
499                }
500            }
501        }
502    }
503
504    /**
505     * Dumps the {@link MessageHistory} from the {@link Exchange} in a human readable format.
506     *
507     * @param exchange           the exchange
508     * @param exchangeFormatter  if provided then information about the exchange is included in the dump
509     * @param logStackTrace      whether to include a header for the stacktrace, to be added (not included in this dump).
510     * @return a human readable message history as a table
511     */
512    public static String dumpMessageHistoryStacktrace(Exchange exchange, ExchangeFormatter exchangeFormatter, boolean logStackTrace) {
513        // must not cause new exceptions so run this in a try catch block
514        try {
515            return doDumpMessageHistoryStacktrace(exchange, exchangeFormatter, logStackTrace);
516        } catch (Throwable e) {
517            // ignore as the body is for logging purpose
518            return "";
519        }
520    }
521
522    @SuppressWarnings("unchecked")
523    public static String doDumpMessageHistoryStacktrace(Exchange exchange, ExchangeFormatter exchangeFormatter, boolean logStackTrace) {
524        List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
525        if (list == null || list.isEmpty()) {
526            return null;
527        }
528
529        StringBuilder sb = new StringBuilder();
530        sb.append("\n");
531        sb.append("Message History\n");
532        sb.append("---------------------------------------------------------------------------------------------------------------------------------------\n");
533        sb.append(String.format(MESSAGE_HISTORY_HEADER, "RouteId", "ProcessorId", "Processor", "Elapsed (ms)"));
534        sb.append("\n");
535
536        // add incoming origin of message on the top
537        String routeId = exchange.getFromRouteId();
538        String id = routeId;
539        String label = "";
540        if (exchange.getFromEndpoint() != null) {
541            label = URISupport.sanitizeUri(exchange.getFromEndpoint().getEndpointUri());
542        }
543        long elapsed = 0;
544        Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class);
545        if (created != null) {
546            elapsed = new StopWatch(created).stop();
547        }
548
549        sb.append(String.format(MESSAGE_HISTORY_OUTPUT, routeId, id, label, elapsed));
550        sb.append("\n");
551
552        // and then each history
553        for (MessageHistory history : list) {
554            routeId = history.getRouteId() != null ? history.getRouteId() : "";
555            id = history.getNode().getId();
556            // we need to avoid leak the sensible information here
557            label =  URISupport.sanitizeUri(history.getNode().getLabel());
558            elapsed = history.getElapsed();
559
560            sb.append(String.format(MESSAGE_HISTORY_OUTPUT, routeId, id, label, elapsed));
561            sb.append("\n");
562        }
563
564        if (exchangeFormatter != null) {
565            sb.append("\nExchange\n");
566            sb.append("---------------------------------------------------------------------------------------------------------------------------------------\n");
567            sb.append(exchangeFormatter.format(exchange));
568            sb.append("\n");
569        }
570
571        if (logStackTrace) {
572            sb.append("\nStacktrace\n");
573            sb.append("---------------------------------------------------------------------------------------------------------------------------------------");
574        }
575        return sb.toString();
576    }
577
578}