001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.util;
018
019import java.util.ArrayList;
020import java.util.HashMap;
021import java.util.List;
022import java.util.Map;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.Future;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.TimeoutException;
028
029import org.apache.camel.CamelContext;
030import org.apache.camel.CamelExchangeException;
031import org.apache.camel.CamelExecutionException;
032import org.apache.camel.Endpoint;
033import org.apache.camel.Exchange;
034import org.apache.camel.ExchangePattern;
035import org.apache.camel.InvalidPayloadException;
036import org.apache.camel.Message;
037import org.apache.camel.MessageHistory;
038import org.apache.camel.NoSuchBeanException;
039import org.apache.camel.NoSuchEndpointException;
040import org.apache.camel.NoSuchHeaderException;
041import org.apache.camel.NoSuchPropertyException;
042import org.apache.camel.NoTypeConversionAvailableException;
043import org.apache.camel.TypeConversionException;
044import org.apache.camel.TypeConverter;
045import org.apache.camel.impl.DefaultExchange;
046import org.apache.camel.impl.MessageSupport;
047import org.apache.camel.spi.UnitOfWork;
048
049/**
050 * Some helper methods for working with {@link Exchange} objects
051 *
052 * @version
053 */
054public final class ExchangeHelper {
055
056    /**
057     * Utility classes should not have a public constructor.
058     */
059    private ExchangeHelper() {
060    }
061
062    /**
063     * Extracts the Exchange.BINDING of the given type or null if not present
064     *
065     * @param exchange the message exchange
066     * @param type     the expected binding type
067     * @return the binding object of the given type or null if it could not be found or converted
068     */
069    public static <T> T getBinding(Exchange exchange, Class<T> type) {
070        return exchange != null ? exchange.getProperty(Exchange.BINDING, type) : null;
071    }
072
073    /**
074     * Attempts to resolve the endpoint for the given value
075     *
076     * @param exchange the message exchange being processed
077     * @param value    the value which can be an {@link Endpoint} or an object
078     *                 which provides a String representation of an endpoint via
079     *                 {@link #toString()}
080     * @return the endpoint
081     * @throws NoSuchEndpointException if the endpoint cannot be resolved
082     */
083    public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
084        Endpoint endpoint;
085        if (value instanceof Endpoint) {
086            endpoint = (Endpoint) value;
087        } else {
088            String uri = value.toString().trim();
089            endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri);
090        }
091        return endpoint;
092    }
093
094    /**
095     * Gets the mandatory property of the exchange of the correct type
096     *
097     * @param exchange      the exchange
098     * @param propertyName  the property name
099     * @param type          the type
100     * @return the property value
101     * @throws TypeConversionException is thrown if error during type conversion
102     * @throws NoSuchPropertyException is thrown if no property exists
103     */
104    public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type) throws NoSuchPropertyException {
105        T result = exchange.getProperty(propertyName, type);
106        if (result != null) {
107            return result;
108        }
109        throw new NoSuchPropertyException(exchange, propertyName, type);
110    }
111
112    /**
113     * Gets the mandatory inbound header of the correct type
114     *
115     * @param exchange      the exchange
116     * @param headerName    the header name
117     * @param type          the type
118     * @return the header value
119     * @throws TypeConversionException is thrown if error during type conversion
120     * @throws NoSuchHeaderException is thrown if no headers exists
121     */
122    public static <T> T getMandatoryHeader(Exchange exchange, String headerName, Class<T> type) throws TypeConversionException, NoSuchHeaderException {
123        T answer = exchange.getIn().getHeader(headerName, type);
124        if (answer == null) {
125            throw new NoSuchHeaderException(exchange, headerName, type);
126        }
127        return answer;
128    }
129
130    /**
131     * Returns the mandatory inbound message body of the correct type or throws
132     * an exception if it is not present
133     *
134     * @param exchange the exchange
135     * @return the body, is never <tt>null</tt>
136     * @throws InvalidPayloadException Is thrown if the body being <tt>null</tt> or wrong class type
137     * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()}
138     */
139    @Deprecated
140    public static Object getMandatoryInBody(Exchange exchange) throws InvalidPayloadException {
141        return exchange.getIn().getMandatoryBody();
142    }
143
144    /**
145     * Returns the mandatory inbound message body of the correct type or throws
146     * an exception if it is not present
147     * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)}
148     */
149    @Deprecated
150    public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
151        return exchange.getIn().getMandatoryBody(type);
152    }
153
154    /**
155     * Returns the mandatory outbound message body of the correct type or throws
156     * an exception if it is not present
157     * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()}
158     */
159    @Deprecated
160    public static Object getMandatoryOutBody(Exchange exchange) throws InvalidPayloadException {
161        return exchange.getOut().getMandatoryBody();
162    }
163
164    /**
165     * Returns the mandatory outbound message body of the correct type or throws
166     * an exception if it is not present
167     * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)}
168     */
169    @Deprecated
170    public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
171        return exchange.getOut().getMandatoryBody(type);
172    }
173
174    /**
175     * Converts the value to the given expected type or throws an exception
176     *
177     * @return the converted value
178     * @throws TypeConversionException is thrown if error during type conversion
179     * @throws NoTypeConversionAvailableException} if no type converters exists to convert to the given type
180     */
181    public static <T> T convertToMandatoryType(Exchange exchange, Class<T> type, Object value)
182        throws TypeConversionException, NoTypeConversionAvailableException {
183        CamelContext camelContext = exchange.getContext();
184        ObjectHelper.notNull(camelContext, "CamelContext of Exchange");
185        TypeConverter converter = camelContext.getTypeConverter();
186        if (converter != null) {
187            return converter.mandatoryConvertTo(type, exchange, value);
188        }
189        throw new NoTypeConversionAvailableException(value, type);
190    }
191
192    /**
193     * Converts the value to the given expected type
194     *
195     * @return the converted value
196     * @throws org.apache.camel.TypeConversionException is thrown if error during type conversion
197     */
198    public static <T> T convertToType(Exchange exchange, Class<T> type, Object value) throws TypeConversionException {
199        CamelContext camelContext = exchange.getContext();
200        ObjectHelper.notNull(camelContext, "CamelContext of Exchange");
201        TypeConverter converter = camelContext.getTypeConverter();
202        if (converter != null) {
203            return converter.convertTo(type, exchange, value);
204        }
205        return null;
206    }
207
208    /**
209     * Creates a new instance and copies from the current message exchange so that it can be
210     * forwarded to another destination as a new instance. Unlike regular copy this operation
211     * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
212     * for async messaging, where the original and copied exchange are independent.
213     *
214     * @param exchange original copy of the exchange
215     * @param handover whether the on completion callbacks should be handed over to the new copy.
216     */
217    public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
218        return createCorrelatedCopy(exchange, handover, false);
219    }
220
221    /**
222     * Creates a new instance and copies from the current message exchange so that it can be
223     * forwarded to another destination as a new instance. Unlike regular copy this operation
224     * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
225     * for async messaging, where the original and copied exchange are independent.
226     *
227     * @param exchange original copy of the exchange
228     * @param handover whether the on completion callbacks should be handed over to the new copy.
229     * @param useSameMessageId whether to use same message id on the copy message.
230     */
231    public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId) {
232        String id = exchange.getExchangeId();
233
234        // make sure to do a safe copy as the correlated copy can be routed independently of the source.
235        Exchange copy = exchange.copy(true);
236        // do not reuse message id on copy
237        if (!useSameMessageId) {
238            if (copy.hasOut()) {
239                copy.getOut().setMessageId(null);
240            }
241            copy.getIn().setMessageId(null);
242        }
243        // do not share the unit of work
244        copy.setUnitOfWork(null);
245        // do not reuse the message id
246        // hand over on completion to the copy if we got any
247        UnitOfWork uow = exchange.getUnitOfWork();
248        if (handover && uow != null) {
249            uow.handoverSynchronization(copy);
250        }
251        // set a correlation id so we can track back the original exchange
252        copy.setProperty(Exchange.CORRELATION_ID, id);
253        return copy;
254    }
255
256    /**
257     * Creates a new instance and copies from the current message exchange so that it can be
258     * forwarded to another destination as a new instance.
259     *
260     * @param exchange original copy of the exchange
261     * @param preserveExchangeId whether or not the exchange id should be preserved
262     * @return the copy
263     */
264    public static Exchange createCopy(Exchange exchange, boolean preserveExchangeId) {
265        Exchange copy = exchange.copy();
266        if (preserveExchangeId) {
267            // must preserve exchange id
268            copy.setExchangeId(exchange.getExchangeId());
269        }
270        return copy;
271    }
272
273    /**
274     * Copies the results of a message exchange from the source exchange to the result exchange
275     * which will copy the message contents, exchange properties and the exception.
276     * Notice the {@link ExchangePattern} is <b>not</b> copied/altered.
277     *
278     * @param result the result exchange which will have the output and error state added
279     * @param source the source exchange which is not modified
280     */
281    public static void copyResults(Exchange result, Exchange source) {
282
283        // --------------------------------------------------------------------
284        //  TODO: merge logic with that of copyResultsPreservePattern()
285        // --------------------------------------------------------------------
286
287        if (result == source) {
288            // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
289            // and the result is not failed
290            if (result.getPattern() == ExchangePattern.InOptionalOut) {
291                // keep as is
292            } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
293                // copy IN to OUT as we expect a OUT response
294                result.getOut().copyFrom(source.getIn());
295            }
296            return;
297        }
298
299        if (result != source) {
300            result.setException(source.getException());
301            if (source.hasOut()) {
302                result.getOut().copyFrom(source.getOut());
303            } else if (result.getPattern() == ExchangePattern.InOptionalOut) {
304                // special case where the result is InOptionalOut and with no OUT response
305                // so we should return null to indicate this fact
306                result.setOut(null);
307            } else {
308                // no results so lets copy the last input
309                // as the final processor on a pipeline might not
310                // have created any OUT; such as a mock:endpoint
311                // so lets assume the last IN is the OUT
312                if (result.getPattern().isOutCapable()) {
313                    // only set OUT if its OUT capable
314                    result.getOut().copyFrom(source.getIn());
315                } else {
316                    // if not replace IN instead to keep the MEP
317                    result.getIn().copyFrom(source.getIn());
318                    // clear any existing OUT as the result is on the IN
319                    if (result.hasOut()) {
320                        result.setOut(null);
321                    }
322                }
323            }
324
325            if (source.hasProperties()) {
326                result.getProperties().putAll(source.getProperties());
327            }
328        }
329    }
330
331    /**
332     * Copies the <code>source</code> exchange to <code>target</code> exchange
333     * preserving the {@link ExchangePattern} of <code>target</code>.
334     *
335     * @param source source exchange.
336     * @param result target exchange.
337     */
338    public static void copyResultsPreservePattern(Exchange result, Exchange source) {
339
340        // --------------------------------------------------------------------
341        //  TODO: merge logic with that of copyResults()
342        // --------------------------------------------------------------------
343
344        if (result == source) {
345            // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
346            // and the result is not failed
347            if (result.getPattern() == ExchangePattern.InOptionalOut) {
348                // keep as is
349            } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
350                // copy IN to OUT as we expect a OUT response
351                result.getOut().copyFrom(source.getIn());
352            }
353            return;
354        }
355
356        // copy in message
357        result.getIn().copyFrom(source.getIn());
358
359        // copy out message
360        if (source.hasOut()) {
361            // exchange pattern sensitive
362            Message resultMessage = source.getOut().isFault() ? result.getOut() : getResultMessage(result);
363            resultMessage.copyFrom(source.getOut());
364        }
365
366        // copy exception
367        result.setException(source.getException());
368
369        // copy properties
370        if (source.hasProperties()) {
371            result.getProperties().putAll(source.getProperties());
372        }
373    }
374
375    /**
376     * Returns the message where to write results in an
377     * exchange-pattern-sensitive way.
378     *
379     * @param exchange message exchange.
380     * @return result message.
381     */
382    public static Message getResultMessage(Exchange exchange) {
383        if (exchange.getPattern().isOutCapable()) {
384            return exchange.getOut();
385        } else {
386            return exchange.getIn();
387        }
388    }
389
390    /**
391     * Returns true if the given exchange pattern (if defined) can support OUT messages
392     *
393     * @param exchange the exchange to interrogate
394     * @return true if the exchange is defined as an {@link ExchangePattern} which supports
395     *         OUT messages
396     */
397    public static boolean isOutCapable(Exchange exchange) {
398        ExchangePattern pattern = exchange.getPattern();
399        return pattern != null && pattern.isOutCapable();
400    }
401
402    /**
403     * Creates a new instance of the given type from the injector
404     *
405     * @param exchange the exchange
406     * @param type     the given type
407     * @return the created instance of the given type
408     */
409    public static <T> T newInstance(Exchange exchange, Class<T> type) {
410        return exchange.getContext().getInjector().newInstance(type);
411    }
412
413    /**
414     * Creates a Map of the variables which are made available to a script or template
415     *
416     * @param exchange the exchange to make available
417     * @return a Map populated with the require variables
418     */
419    public static Map<String, Object> createVariableMap(Exchange exchange) {
420        Map<String, Object> answer = new HashMap<String, Object>();
421        populateVariableMap(exchange, answer);
422        return answer;
423    }
424
425    /**
426     * Populates the Map with the variables which are made available to a script or template
427     *
428     * @param exchange the exchange to make available
429     * @param map      the map to populate
430     */
431    public static void populateVariableMap(Exchange exchange, Map<String, Object> map) {
432        map.put("exchange", exchange);
433        Message in = exchange.getIn();
434        map.put("in", in);
435        map.put("request", in);
436        map.put("headers", in.getHeaders());
437        map.put("body", in.getBody());
438        if (isOutCapable(exchange)) {
439            // if we are out capable then set out and response as well
440            // however only grab OUT if it exists, otherwise reuse IN
441            // this prevents side effects to alter the Exchange if we force creating an OUT message
442            Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
443            map.put("out", msg);
444            map.put("response", msg);
445        }
446        map.put("camelContext", exchange.getContext());
447    }
448
449    /**
450     * Returns the MIME content type on the input message or null if one is not defined
451     *
452     * @param exchange the exchange
453     * @return the MIME content type
454     */
455    public static String getContentType(Exchange exchange) {
456        return MessageHelper.getContentType(exchange.getIn());
457    }
458
459    /**
460     * Returns the MIME content encoding on the input message or null if one is not defined
461     *
462     * @param exchange the exchange
463     * @return the MIME content encoding
464     */
465    public static String getContentEncoding(Exchange exchange) {
466        return MessageHelper.getContentEncoding(exchange.getIn());
467    }
468
469    /**
470     * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
471     *
472     * @param exchange the exchange
473     * @param name     the bean name
474     * @return the bean
475     * @throws NoSuchBeanException if no bean could be found in the registry
476     */
477    public static Object lookupMandatoryBean(Exchange exchange, String name) throws NoSuchBeanException {
478        Object value = lookupBean(exchange, name);
479        if (value == null) {
480            throw new NoSuchBeanException(name);
481        }
482        return value;
483    }
484
485    /**
486     * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
487     *
488     * @param exchange the exchange
489     * @param name     the bean name
490     * @param type     the expected bean type
491     * @return the bean
492     * @throws NoSuchBeanException if no bean could be found in the registry
493     */
494    public static <T> T lookupMandatoryBean(Exchange exchange, String name, Class<T> type) {
495        T value = lookupBean(exchange, name, type);
496        if (value == null) {
497            throw new NoSuchBeanException(name);
498        }
499        return value;
500    }
501
502    /**
503     * Performs a lookup in the registry of the bean name
504     *
505     * @param exchange the exchange
506     * @param name     the bean name
507     * @return the bean, or <tt>null</tt> if no bean could be found
508     */
509    public static Object lookupBean(Exchange exchange, String name) {
510        return exchange.getContext().getRegistry().lookupByName(name);
511    }
512
513    /**
514     * Performs a lookup in the registry of the bean name and type
515     *
516     * @param exchange the exchange
517     * @param name     the bean name
518     * @param type     the expected bean type
519     * @return the bean, or <tt>null</tt> if no bean could be found
520     */
521    public static <T> T lookupBean(Exchange exchange, String name, Class<T> type) {
522        return exchange.getContext().getRegistry().lookupByNameAndType(name, type);
523    }
524
525    /**
526     * Returns the first exchange in the given collection of exchanges which has the same exchange ID as the one given
527     * or null if none could be found
528     *
529     * @param exchanges  the exchanges
530     * @param exchangeId the exchangeId to find
531     * @return matching exchange, or <tt>null</tt> if none found
532     */
533    public static Exchange getExchangeById(Iterable<Exchange> exchanges, String exchangeId) {
534        for (Exchange exchange : exchanges) {
535            String id = exchange.getExchangeId();
536            if (id != null && id.equals(exchangeId)) {
537                return exchange;
538            }
539        }
540        return null;
541    }
542
543    /**
544     * Prepares the exchanges for aggregation.
545     * <p/>
546     * This implementation will copy the OUT body to the IN body so when you do
547     * aggregation the body is <b>only</b> in the IN body to avoid confusing end users.
548     *
549     * @param oldExchange the old exchange
550     * @param newExchange the new exchange
551     */
552    public static void prepareAggregation(Exchange oldExchange, Exchange newExchange) {
553        // move body/header from OUT to IN
554        if (oldExchange != null) {
555            if (oldExchange.hasOut()) {
556                oldExchange.setIn(oldExchange.getOut());
557                oldExchange.setOut(null);
558            }
559        }
560
561        if (newExchange != null) {
562            if (newExchange.hasOut()) {
563                newExchange.setIn(newExchange.getOut());
564                newExchange.setOut(null);
565            }
566        }
567    }
568
569    /**
570     * Checks whether the exchange has been failure handed
571     *
572     * @param exchange  the exchange
573     * @return <tt>true</tt> if failure handled, <tt>false</tt> otherwise
574     */
575    public static boolean isFailureHandled(Exchange exchange) {
576        return exchange.getProperty(Exchange.FAILURE_HANDLED, false, Boolean.class);
577    }
578
579    /**
580     * Checks whether the exchange {@link UnitOfWork} is exhausted
581     *
582     * @param exchange  the exchange
583     * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise
584     */
585    public static boolean isUnitOfWorkExhausted(Exchange exchange) {
586        return exchange.getProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, false, Boolean.class);
587    }
588
589    /**
590     * Sets the exchange to be failure handled.
591     *
592     * @param exchange  the exchange
593     */
594    public static void setFailureHandled(Exchange exchange) {
595        exchange.setProperty(Exchange.FAILURE_HANDLED, Boolean.TRUE);
596        // clear exception since its failure handled
597        exchange.setException(null);
598    }
599
600    /**
601     * Checks whether the exchange is redelivery exhausted
602     *
603     * @param exchange  the exchange
604     * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise
605     */
606    public static boolean isRedeliveryExhausted(Exchange exchange) {
607        return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
608    }
609
610    /**
611     * Checks whether the exchange {@link UnitOfWork} is redelivered
612     *
613     * @param exchange  the exchange
614     * @return <tt>true</tt> if redelivered, <tt>false</tt> otherwise
615     */
616    public static boolean isRedelivered(Exchange exchange) {
617        return exchange.getIn().hasHeaders() && exchange.getIn().getHeader(Exchange.REDELIVERED, false, Boolean.class);
618    }
619
620    /**
621     * Checks whether the exchange {@link UnitOfWork} has been interrupted during processing
622     *
623     * @param exchange  the exchange
624     * @return <tt>true</tt> if interrupted, <tt>false</tt> otherwise
625     */
626    public static boolean isInterrupted(Exchange exchange) {
627        return exchange.getException(InterruptedException.class) != null;
628    }
629
630    /**
631     * Check whether or not stream caching is enabled for the given route or globally.
632     *
633     * @param exchange  the exchange
634     * @return <tt>true</tt> if enabled, <tt>false</tt> otherwise
635     */
636    public static boolean isStreamCachingEnabled(final Exchange exchange) {
637        if (exchange.getFromRouteId() == null) {
638            return exchange.getContext().getStreamCachingStrategy().isEnabled();
639        } else {
640            return exchange.getContext().getRoute(exchange.getFromRouteId()).getRouteContext().isStreamCaching();
641        }
642    }
643
644    /**
645     * Extracts the body from the given exchange.
646     * <p/>
647     * If the exchange pattern is provided it will try to honor it and retrieve the body
648     * from either IN or OUT according to the pattern.
649     *
650     * @param exchange the exchange
651     * @param pattern  exchange pattern if given, can be <tt>null</tt>
652     * @return the result body, can be <tt>null</tt>.
653     * @throws CamelExecutionException is thrown if the processing of the exchange failed
654     */
655    public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) {
656        Object answer = null;
657        if (exchange != null) {
658            // rethrow if there was an exception during execution
659            if (exchange.getException() != null) {
660                throw ObjectHelper.wrapCamelExecutionException(exchange, exchange.getException());
661            }
662
663            // result could have a fault message
664            if (hasFaultMessage(exchange)) {
665                Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
666                answer = msg.getBody();
667                return answer;
668            }
669
670            // okay no fault then return the response according to the pattern
671            // try to honor pattern if provided
672            boolean notOut = pattern != null && !pattern.isOutCapable();
673            boolean hasOut = exchange.hasOut();
674            if (hasOut && !notOut) {
675                // we have a response in out and the pattern is out capable
676                answer = exchange.getOut().getBody();
677            } else if (!hasOut && exchange.getPattern() == ExchangePattern.InOptionalOut) {
678                // special case where the result is InOptionalOut and with no OUT response
679                // so we should return null to indicate this fact
680                answer = null;
681            } else {
682                // use IN as the response
683                answer = exchange.getIn().getBody();
684            }
685        }
686        return answer;
687    }
688
689    /**
690     * Tests whether the exchange has a fault message set and that its not null.
691     *
692     * @param exchange the exchange
693     * @return <tt>true</tt> if fault message exists
694     */
695    public static boolean hasFaultMessage(Exchange exchange) {
696        Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
697        return msg.isFault() && msg.getBody() != null;
698    }
699
700    /**
701     * Tests whether the exchange has already been handled by the error handler
702     *
703     * @param exchange the exchange
704     * @return <tt>true</tt> if handled already by error handler, <tt>false</tt> otherwise
705     */
706    public static boolean hasExceptionBeenHandledByErrorHandler(Exchange exchange) {
707        return Boolean.TRUE.equals(exchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
708    }
709
710    /**
711     * Extracts the body from the given future, that represents a handle to an asynchronous exchange.
712     * <p/>
713     * Will wait until the future task is complete.
714     *
715     * @param context the camel context
716     * @param future  the future handle
717     * @param type    the expected body response type
718     * @return the result body, can be <tt>null</tt>.
719     * @throws CamelExecutionException is thrown if the processing of the exchange failed
720     */
721    public static <T> T extractFutureBody(CamelContext context, Future<Object> future, Class<T> type) {
722        try {
723            return doExtractFutureBody(context, future.get(), type);
724        } catch (InterruptedException e) {
725            throw ObjectHelper.wrapRuntimeCamelException(e);
726        } catch (ExecutionException e) {
727            // execution failed due to an exception so rethrow the cause
728            throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
729        } finally {
730            // its harmless to cancel if task is already completed
731            // and in any case we do not want to get hold of the task a 2nd time
732            // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
733            future.cancel(true);
734        }
735    }
736
737    /**
738     * Extracts the body from the given future, that represents a handle to an asynchronous exchange.
739     * <p/>
740     * Will wait for the future task to complete, but waiting at most the timeout value.
741     *
742     * @param context the camel context
743     * @param future  the future handle
744     * @param timeout timeout value
745     * @param unit    timeout unit
746     * @param type    the expected body response type
747     * @return the result body, can be <tt>null</tt>.
748     * @throws CamelExecutionException is thrown if the processing of the exchange failed
749     * @throws java.util.concurrent.TimeoutException is thrown if a timeout triggered
750     */
751    public static <T> T extractFutureBody(CamelContext context, Future<Object> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
752        try {
753            if (timeout > 0) {
754                return doExtractFutureBody(context, future.get(timeout, unit), type);
755            } else {
756                return doExtractFutureBody(context, future.get(), type);
757            }
758        } catch (InterruptedException e) {
759            // execution failed due interruption so rethrow the cause
760            throw ObjectHelper.wrapCamelExecutionException(null, e);
761        } catch (ExecutionException e) {
762            // execution failed due to an exception so rethrow the cause
763            throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
764        } finally {
765            // its harmless to cancel if task is already completed
766            // and in any case we do not want to get hold of the task a 2nd time
767            // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
768            future.cancel(true);
769        }
770    }
771
772    private static <T> T doExtractFutureBody(CamelContext context, Object result, Class<T> type) {
773        if (result == null) {
774            return null;
775        }
776        if (type.isAssignableFrom(result.getClass())) {
777            return type.cast(result);
778        }
779        if (result instanceof Exchange) {
780            Exchange exchange = (Exchange) result;
781            Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern());
782            return context.getTypeConverter().convertTo(type, exchange, answer);
783        }
784        return context.getTypeConverter().convertTo(type, result);
785    }
786
787    /**
788     * @deprecated use org.apache.camel.CamelExchangeException.createExceptionMessage instead
789     */
790    @Deprecated
791    public static String createExceptionMessage(String message, Exchange exchange, Throwable cause) {
792        return CamelExchangeException.createExceptionMessage(message, exchange, cause);
793    }
794
795    /**
796     * Strategy to prepare results before next iterator or when we are complete,
797     * which is done by copying OUT to IN, so there is only an IN as input
798     * for the next iteration.
799     *
800     * @param exchange the exchange to prepare
801     */
802    public static void prepareOutToIn(Exchange exchange) {
803        // we are routing using pipes and filters so we need to manually copy OUT to IN
804        if (exchange.hasOut()) {
805            exchange.setIn(exchange.getOut());
806            exchange.setOut(null);
807        }
808    }
809
810    /**
811     * Gets both the messageId and exchangeId to be used for logging purposes.
812     * <p/>
813     * Logging both ids, can help to correlate exchanges which may be redelivered messages
814     * from for example a JMS broker.
815     *
816     * @param exchange the exchange
817     * @return a log message with both the messageId and exchangeId
818     */
819    public static String logIds(Exchange exchange) {
820        String msgId = exchange.hasOut() ? exchange.getOut().getMessageId() : exchange.getIn().getMessageId();
821        return "(MessageId: " + msgId + " on ExchangeId: " + exchange.getExchangeId()  + ")";
822    }
823
824    /**
825     * Copies the exchange but the copy will be tied to the given context
826     *
827     * @param exchange  the source exchange
828     * @param context   the camel context
829     * @return a copy with the given camel context
830     */
831    public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context) {
832        return copyExchangeAndSetCamelContext(exchange, context, true);
833    }
834
835    /**
836     * Copies the exchange but the copy will be tied to the given context
837     *
838     * @param exchange  the source exchange
839     * @param context   the camel context
840     * @param handover  whether to handover on completions from the source to the copy
841     * @return a copy with the given camel context
842     */
843    public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context, boolean handover) {
844        DefaultExchange answer = new DefaultExchange(context, exchange.getPattern());
845        if (exchange.hasProperties()) {
846            answer.setProperties(safeCopy(exchange.getProperties()));
847        }
848        if (handover) {
849            // Need to hand over the completion for async invocation
850            exchange.handoverCompletions(answer);
851        }
852        answer.setIn(exchange.getIn().copy());
853        if (exchange.hasOut()) {
854            answer.setOut(exchange.getOut().copy());
855        }
856        answer.setException(exchange.getException());
857        return answer;
858    }
859
860    /**
861     * Replaces the existing message with the new message
862     *
863     * @param exchange  the exchange
864     * @param newMessage the new message
865     * @param outOnly    whether to replace the message as OUT message
866     */
867    public static void replaceMessage(Exchange exchange, Message newMessage, boolean outOnly) {
868        Message old = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
869        if (outOnly || exchange.hasOut()) {
870            exchange.setOut(newMessage);
871        } else {
872            exchange.setIn(newMessage);
873        }
874
875        // need to de-reference old from the exchange so it can be GC
876        if (old instanceof MessageSupport) {
877            ((MessageSupport) old).setExchange(null);
878        }
879    }
880
881    /**
882     * Gets the original IN {@link Message} this Unit of Work was started with.
883     * <p/>
884     * The original message is only returned if the option {@link org.apache.camel.RuntimeConfiguration#isAllowUseOriginalMessage()}
885     * is enabled. If its disabled, then <tt>null</tt> is returned.
886     *
887     * @return the original IN {@link Message}, or <tt>null</tt> if using original message is disabled.
888     */
889    public static Message getOriginalInMessage(Exchange exchange) {
890        Message answer = null;
891
892        // try parent first
893        UnitOfWork uow = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
894        if (uow != null) {
895            answer = uow.getOriginalInMessage();
896        }
897        // fallback to the current exchange
898        if (answer == null) {
899            uow = exchange.getUnitOfWork();
900            if (uow != null) {
901                answer = uow.getOriginalInMessage();
902            }
903        }
904        return answer;
905    }
906
907    @SuppressWarnings("unchecked")
908    private static Map<String, Object> safeCopy(Map<String, Object> properties) {
909        if (properties == null) {
910            return null;
911        }
912
913        Map<String, Object> answer = new ConcurrentHashMap<String, Object>(properties);
914
915        // safe copy message history using a defensive copy
916        List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY);
917        if (history != null) {
918            answer.put(Exchange.MESSAGE_HISTORY, new ArrayList<MessageHistory>(history));
919        }
920
921        return answer;
922    }
923}