001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.util;
018
019import java.util.HashMap;
020import java.util.LinkedList;
021import java.util.List;
022import java.util.Map;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.Future;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.TimeoutException;
028import java.util.function.Predicate;
029
030import org.apache.camel.CamelContext;
031import org.apache.camel.CamelExchangeException;
032import org.apache.camel.CamelExecutionException;
033import org.apache.camel.Endpoint;
034import org.apache.camel.Exchange;
035import org.apache.camel.ExchangePattern;
036import org.apache.camel.InvalidPayloadException;
037import org.apache.camel.Message;
038import org.apache.camel.MessageHistory;
039import org.apache.camel.NoSuchBeanException;
040import org.apache.camel.NoSuchEndpointException;
041import org.apache.camel.NoSuchHeaderException;
042import org.apache.camel.NoSuchPropertyException;
043import org.apache.camel.NoTypeConversionAvailableException;
044import org.apache.camel.TypeConversionException;
045import org.apache.camel.TypeConverter;
046import org.apache.camel.impl.DefaultExchange;
047import org.apache.camel.impl.MessageSupport;
048import org.apache.camel.spi.Synchronization;
049import org.apache.camel.spi.UnitOfWork;
050
051/**
052 * Some helper methods for working with {@link Exchange} objects
053 *
054 * @version
055 */
056public final class ExchangeHelper {
057
058    /**
059     * Utility classes should not have a public constructor.
060     */
061    private ExchangeHelper() {
062    }
063
064    /**
065     * Extracts the Exchange.BINDING of the given type or null if not present
066     *
067     * @param exchange the message exchange
068     * @param type     the expected binding type
069     * @return the binding object of the given type or null if it could not be found or converted
070     */
071    public static <T> T getBinding(Exchange exchange, Class<T> type) {
072        return exchange != null ? exchange.getProperty(Exchange.BINDING, type) : null;
073    }
074
075    /**
076     * Attempts to resolve the endpoint for the given value
077     *
078     * @param exchange the message exchange being processed
079     * @param value    the value which can be an {@link Endpoint} or an object
080     *                 which provides a String representation of an endpoint via
081     *                 {@link #toString()}
082     * @return the endpoint
083     * @throws NoSuchEndpointException if the endpoint cannot be resolved
084     */
085    public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
086        Endpoint endpoint;
087        if (value instanceof Endpoint) {
088            endpoint = (Endpoint) value;
089        } else {
090            String uri = value.toString().trim();
091            endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri);
092        }
093        return endpoint;
094    }
095
096    /**
097     * Gets the mandatory property of the exchange of the correct type
098     *
099     * @param exchange      the exchange
100     * @param propertyName  the property name
101     * @param type          the type
102     * @return the property value
103     * @throws TypeConversionException is thrown if error during type conversion
104     * @throws NoSuchPropertyException is thrown if no property exists
105     */
106    public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type) throws NoSuchPropertyException {
107        T result = exchange.getProperty(propertyName, type);
108        if (result != null) {
109            return result;
110        }
111        throw new NoSuchPropertyException(exchange, propertyName, type);
112    }
113
114    /**
115     * Gets the mandatory inbound header of the correct type
116     *
117     * @param exchange      the exchange
118     * @param headerName    the header name
119     * @param type          the type
120     * @return the header value
121     * @throws TypeConversionException is thrown if error during type conversion
122     * @throws NoSuchHeaderException is thrown if no headers exists
123     */
124    public static <T> T getMandatoryHeader(Exchange exchange, String headerName, Class<T> type) throws TypeConversionException, NoSuchHeaderException {
125        T answer = exchange.getIn().getHeader(headerName, type);
126        if (answer == null) {
127            throw new NoSuchHeaderException(exchange, headerName, type);
128        }
129        return answer;
130    }
131
132    /**
133     * Returns the mandatory inbound message body of the correct type or throws
134     * an exception if it is not present
135     *
136     * @param exchange the exchange
137     * @return the body, is never <tt>null</tt>
138     * @throws InvalidPayloadException Is thrown if the body being <tt>null</tt> or wrong class type
139     * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()}
140     */
141    @Deprecated
142    public static Object getMandatoryInBody(Exchange exchange) throws InvalidPayloadException {
143        return exchange.getIn().getMandatoryBody();
144    }
145
146    /**
147     * Returns the mandatory inbound message body of the correct type or throws
148     * an exception if it is not present
149     * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)}
150     */
151    @Deprecated
152    public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
153        return exchange.getIn().getMandatoryBody(type);
154    }
155
156    /**
157     * Returns the mandatory outbound message body of the correct type or throws
158     * an exception if it is not present
159     * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()}
160     */
161    @Deprecated
162    public static Object getMandatoryOutBody(Exchange exchange) throws InvalidPayloadException {
163        return exchange.getOut().getMandatoryBody();
164    }
165
166    /**
167     * Returns the mandatory outbound message body of the correct type or throws
168     * an exception if it is not present
169     * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)}
170     */
171    @Deprecated
172    public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
173        return exchange.getOut().getMandatoryBody(type);
174    }
175
176    /**
177     * Converts the value to the given expected type or throws an exception
178     *
179     * @return the converted value
180     * @throws TypeConversionException is thrown if error during type conversion
181     * @throws NoTypeConversionAvailableException} if no type converters exists to convert to the given type
182     */
183    public static <T> T convertToMandatoryType(Exchange exchange, Class<T> type, Object value)
184        throws TypeConversionException, NoTypeConversionAvailableException {
185        CamelContext camelContext = exchange.getContext();
186        ObjectHelper.notNull(camelContext, "CamelContext of Exchange");
187        TypeConverter converter = camelContext.getTypeConverter();
188        if (converter != null) {
189            return converter.mandatoryConvertTo(type, exchange, value);
190        }
191        throw new NoTypeConversionAvailableException(value, type);
192    }
193
194    /**
195     * Converts the value to the given expected type
196     *
197     * @return the converted value
198     * @throws org.apache.camel.TypeConversionException is thrown if error during type conversion
199     */
200    public static <T> T convertToType(Exchange exchange, Class<T> type, Object value) throws TypeConversionException {
201        CamelContext camelContext = exchange.getContext();
202        ObjectHelper.notNull(camelContext, "CamelContext of Exchange");
203        TypeConverter converter = camelContext.getTypeConverter();
204        if (converter != null) {
205            return converter.convertTo(type, exchange, value);
206        }
207        return null;
208    }
209
210    /**
211     * Creates a new instance and copies from the current message exchange so that it can be
212     * forwarded to another destination as a new instance. Unlike regular copy this operation
213     * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
214     * for async messaging, where the original and copied exchange are independent.
215     *
216     * @param exchange original copy of the exchange
217     * @param handover whether the on completion callbacks should be handed over to the new copy.
218     */
219    public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
220        return createCorrelatedCopy(exchange, handover, false);
221    }
222
223    /**
224     * Creates a new instance and copies from the current message exchange so that it can be
225     * forwarded to another destination as a new instance. Unlike regular copy this operation
226     * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
227     * for async messaging, where the original and copied exchange are independent.
228     *
229     * @param exchange original copy of the exchange
230     * @param handover whether the on completion callbacks should be handed over to the new copy.
231     * @param useSameMessageId whether to use same message id on the copy message.
232     */
233    public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId) {
234        return createCorrelatedCopy(exchange, handover, useSameMessageId, null);
235    }
236
237    /**
238     * Creates a new instance and copies from the current message exchange so that it can be
239     * forwarded to another destination as a new instance. Unlike regular copy this operation
240     * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
241     * for async messaging, where the original and copied exchange are independent.
242     *
243     * @param exchange original copy of the exchange
244     * @param handover whether the on completion callbacks should be handed over to the new copy.
245     * @param useSameMessageId whether to use same message id on the copy message.
246     * @param filter whether to handover the on completion
247     */
248    public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId, Predicate<Synchronization> filter) {
249        String id = exchange.getExchangeId();
250
251        // make sure to do a safe copy as the correlated copy can be routed independently of the source.
252        Exchange copy = exchange.copy(true);
253        // do not reuse message id on copy
254        if (!useSameMessageId) {
255            if (copy.hasOut()) {
256                copy.getOut().setMessageId(null);
257            }
258            copy.getIn().setMessageId(null);
259        }
260        // do not share the unit of work
261        copy.setUnitOfWork(null);
262        // do not reuse the message id
263        // hand over on completion to the copy if we got any
264        UnitOfWork uow = exchange.getUnitOfWork();
265        if (handover && uow != null) {
266            uow.handoverSynchronization(copy, filter);
267        }
268        // set a correlation id so we can track back the original exchange
269        copy.setProperty(Exchange.CORRELATION_ID, id);
270        return copy;
271    }
272
273    /**
274     * Creates a new instance and copies from the current message exchange so that it can be
275     * forwarded to another destination as a new instance.
276     *
277     * @param exchange original copy of the exchange
278     * @param preserveExchangeId whether or not the exchange id should be preserved
279     * @return the copy
280     */
281    public static Exchange createCopy(Exchange exchange, boolean preserveExchangeId) {
282        Exchange copy = exchange.copy();
283        if (preserveExchangeId) {
284            // must preserve exchange id
285            copy.setExchangeId(exchange.getExchangeId());
286        }
287        return copy;
288    }
289
290    /**
291     * Copies the results of a message exchange from the source exchange to the result exchange
292     * which will copy the message contents, exchange properties and the exception.
293     * Notice the {@link ExchangePattern} is <b>not</b> copied/altered.
294     *
295     * @param result the result exchange which will have the output and error state added
296     * @param source the source exchange which is not modified
297     */
298    public static void copyResults(Exchange result, Exchange source) {
299
300        // --------------------------------------------------------------------
301        //  TODO: merge logic with that of copyResultsPreservePattern()
302        // --------------------------------------------------------------------
303
304        if (result == source) {
305            // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
306            // and the result is not failed
307            if (result.getPattern() == ExchangePattern.InOptionalOut) {
308                // keep as is
309            } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
310                // copy IN to OUT as we expect a OUT response
311                result.getOut().copyFrom(source.getIn());
312            }
313            return;
314        }
315
316        if (result != source) {
317            result.setException(source.getException());
318            if (source.hasOut()) {
319                result.getOut().copyFrom(source.getOut());
320            } else if (result.getPattern() == ExchangePattern.InOptionalOut) {
321                // special case where the result is InOptionalOut and with no OUT response
322                // so we should return null to indicate this fact
323                result.setOut(null);
324            } else {
325                // no results so lets copy the last input
326                // as the final processor on a pipeline might not
327                // have created any OUT; such as a mock:endpoint
328                // so lets assume the last IN is the OUT
329                if (result.getPattern().isOutCapable()) {
330                    // only set OUT if its OUT capable
331                    result.getOut().copyFrom(source.getIn());
332                } else {
333                    // if not replace IN instead to keep the MEP
334                    result.getIn().copyFrom(source.getIn());
335                    // clear any existing OUT as the result is on the IN
336                    if (result.hasOut()) {
337                        result.setOut(null);
338                    }
339                }
340            }
341
342            if (source.hasProperties()) {
343                result.getProperties().putAll(source.getProperties());
344            }
345        }
346    }
347
348    /**
349     * Copies the <code>source</code> exchange to <code>target</code> exchange
350     * preserving the {@link ExchangePattern} of <code>target</code>.
351     *
352     * @param source source exchange.
353     * @param result target exchange.
354     */
355    public static void copyResultsPreservePattern(Exchange result, Exchange source) {
356
357        // --------------------------------------------------------------------
358        //  TODO: merge logic with that of copyResults()
359        // --------------------------------------------------------------------
360
361        if (result == source) {
362            // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
363            // and the result is not failed
364            if (result.getPattern() == ExchangePattern.InOptionalOut) {
365                // keep as is
366            } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
367                // copy IN to OUT as we expect a OUT response
368                result.getOut().copyFrom(source.getIn());
369            }
370            return;
371        }
372
373        // copy in message
374        result.getIn().copyFrom(source.getIn());
375
376        // copy out message
377        if (source.hasOut()) {
378            // exchange pattern sensitive
379            Message resultMessage = source.getOut().isFault() ? result.getOut() : getResultMessage(result);
380            resultMessage.copyFrom(source.getOut());
381        }
382
383        // copy exception
384        result.setException(source.getException());
385
386        // copy properties
387        if (source.hasProperties()) {
388            result.getProperties().putAll(source.getProperties());
389        }
390    }
391
392    /**
393     * Returns the message where to write results in an
394     * exchange-pattern-sensitive way.
395     *
396     * @param exchange message exchange.
397     * @return result message.
398     */
399    public static Message getResultMessage(Exchange exchange) {
400        if (exchange.getPattern().isOutCapable()) {
401            return exchange.getOut();
402        } else {
403            return exchange.getIn();
404        }
405    }
406
407    /**
408     * Returns true if the given exchange pattern (if defined) can support OUT messages
409     *
410     * @param exchange the exchange to interrogate
411     * @return true if the exchange is defined as an {@link ExchangePattern} which supports
412     *         OUT messages
413     */
414    public static boolean isOutCapable(Exchange exchange) {
415        ExchangePattern pattern = exchange.getPattern();
416        return pattern != null && pattern.isOutCapable();
417    }
418
419    /**
420     * Creates a new instance of the given type from the injector
421     *
422     * @param exchange the exchange
423     * @param type     the given type
424     * @return the created instance of the given type
425     */
426    public static <T> T newInstance(Exchange exchange, Class<T> type) {
427        return exchange.getContext().getInjector().newInstance(type);
428    }
429
430    /**
431     * Creates a Map of the variables which are made available to a script or template
432     *
433     * @param exchange the exchange to make available
434     * @return a Map populated with the require variables
435     */
436    public static Map<String, Object> createVariableMap(Exchange exchange) {
437        Map<String, Object> answer = new HashMap<String, Object>();
438        populateVariableMap(exchange, answer);
439        return answer;
440    }
441
442    /**
443     * Populates the Map with the variables which are made available to a script or template
444     *
445     * @param exchange the exchange to make available
446     * @param map      the map to populate
447     */
448    public static void populateVariableMap(Exchange exchange, Map<String, Object> map) {
449        map.put("exchange", exchange);
450        Message in = exchange.getIn();
451        map.put("in", in);
452        map.put("request", in);
453        map.put("headers", in.getHeaders());
454        map.put("body", in.getBody());
455        if (isOutCapable(exchange)) {
456            // if we are out capable then set out and response as well
457            // however only grab OUT if it exists, otherwise reuse IN
458            // this prevents side effects to alter the Exchange if we force creating an OUT message
459            Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
460            map.put("out", msg);
461            map.put("response", msg);
462        }
463        map.put("camelContext", exchange.getContext());
464    }
465
466    /**
467     * Returns the MIME content type on the input message or null if one is not defined
468     *
469     * @param exchange the exchange
470     * @return the MIME content type
471     */
472    public static String getContentType(Exchange exchange) {
473        return MessageHelper.getContentType(exchange.getIn());
474    }
475
476    /**
477     * Returns the MIME content encoding on the input message or null if one is not defined
478     *
479     * @param exchange the exchange
480     * @return the MIME content encoding
481     */
482    public static String getContentEncoding(Exchange exchange) {
483        return MessageHelper.getContentEncoding(exchange.getIn());
484    }
485
486    /**
487     * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
488     *
489     * @param exchange the exchange
490     * @param name     the bean name
491     * @return the bean
492     * @throws NoSuchBeanException if no bean could be found in the registry
493     */
494    public static Object lookupMandatoryBean(Exchange exchange, String name) throws NoSuchBeanException {
495        Object value = lookupBean(exchange, name);
496        if (value == null) {
497            throw new NoSuchBeanException(name);
498        }
499        return value;
500    }
501
502    /**
503     * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
504     *
505     * @param exchange the exchange
506     * @param name     the bean name
507     * @param type     the expected bean type
508     * @return the bean
509     * @throws NoSuchBeanException if no bean could be found in the registry
510     */
511    public static <T> T lookupMandatoryBean(Exchange exchange, String name, Class<T> type) {
512        T value = lookupBean(exchange, name, type);
513        if (value == null) {
514            throw new NoSuchBeanException(name);
515        }
516        return value;
517    }
518
519    /**
520     * Performs a lookup in the registry of the bean name
521     *
522     * @param exchange the exchange
523     * @param name     the bean name
524     * @return the bean, or <tt>null</tt> if no bean could be found
525     */
526    public static Object lookupBean(Exchange exchange, String name) {
527        return exchange.getContext().getRegistry().lookupByName(name);
528    }
529
530    /**
531     * Performs a lookup in the registry of the bean name and type
532     *
533     * @param exchange the exchange
534     * @param name     the bean name
535     * @param type     the expected bean type
536     * @return the bean, or <tt>null</tt> if no bean could be found
537     */
538    public static <T> T lookupBean(Exchange exchange, String name, Class<T> type) {
539        return exchange.getContext().getRegistry().lookupByNameAndType(name, type);
540    }
541
542    /**
543     * Returns the first exchange in the given collection of exchanges which has the same exchange ID as the one given
544     * or null if none could be found
545     *
546     * @param exchanges  the exchanges
547     * @param exchangeId the exchangeId to find
548     * @return matching exchange, or <tt>null</tt> if none found
549     */
550    public static Exchange getExchangeById(Iterable<Exchange> exchanges, String exchangeId) {
551        for (Exchange exchange : exchanges) {
552            String id = exchange.getExchangeId();
553            if (id != null && id.equals(exchangeId)) {
554                return exchange;
555            }
556        }
557        return null;
558    }
559
560    /**
561     * Prepares the exchanges for aggregation.
562     * <p/>
563     * This implementation will copy the OUT body to the IN body so when you do
564     * aggregation the body is <b>only</b> in the IN body to avoid confusing end users.
565     *
566     * @param oldExchange the old exchange
567     * @param newExchange the new exchange
568     */
569    public static void prepareAggregation(Exchange oldExchange, Exchange newExchange) {
570        // move body/header from OUT to IN
571        if (oldExchange != null) {
572            if (oldExchange.hasOut()) {
573                oldExchange.setIn(oldExchange.getOut());
574                oldExchange.setOut(null);
575            }
576        }
577
578        if (newExchange != null) {
579            if (newExchange.hasOut()) {
580                newExchange.setIn(newExchange.getOut());
581                newExchange.setOut(null);
582            }
583        }
584    }
585
586    /**
587     * Checks whether the exchange has been failure handed
588     *
589     * @param exchange  the exchange
590     * @return <tt>true</tt> if failure handled, <tt>false</tt> otherwise
591     */
592    public static boolean isFailureHandled(Exchange exchange) {
593        return exchange.getProperty(Exchange.FAILURE_HANDLED, false, Boolean.class);
594    }
595
596    /**
597     * Checks whether the exchange {@link UnitOfWork} is exhausted
598     *
599     * @param exchange  the exchange
600     * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise
601     */
602    public static boolean isUnitOfWorkExhausted(Exchange exchange) {
603        return exchange.getProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, false, Boolean.class);
604    }
605
606    /**
607     * Sets the exchange to be failure handled.
608     *
609     * @param exchange  the exchange
610     */
611    public static void setFailureHandled(Exchange exchange) {
612        exchange.setProperty(Exchange.FAILURE_HANDLED, Boolean.TRUE);
613        // clear exception since its failure handled
614        exchange.setException(null);
615    }
616
617    /**
618     * Checks whether the exchange is redelivery exhausted
619     *
620     * @param exchange  the exchange
621     * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise
622     */
623    public static boolean isRedeliveryExhausted(Exchange exchange) {
624        return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
625    }
626
627    /**
628     * Checks whether the exchange {@link UnitOfWork} is redelivered
629     *
630     * @param exchange  the exchange
631     * @return <tt>true</tt> if redelivered, <tt>false</tt> otherwise
632     */
633    public static boolean isRedelivered(Exchange exchange) {
634        return exchange.getIn().hasHeaders() && exchange.getIn().getHeader(Exchange.REDELIVERED, false, Boolean.class);
635    }
636
637    /**
638     * Checks whether the exchange {@link UnitOfWork} has been interrupted during processing
639     *
640     * @param exchange  the exchange
641     * @return <tt>true</tt> if interrupted, <tt>false</tt> otherwise
642     */
643    public static boolean isInterrupted(Exchange exchange) {
644        return exchange.getException(InterruptedException.class) != null;
645    }
646
647    /**
648     * Check whether or not stream caching is enabled for the given route or globally.
649     *
650     * @param exchange  the exchange
651     * @return <tt>true</tt> if enabled, <tt>false</tt> otherwise
652     */
653    public static boolean isStreamCachingEnabled(final Exchange exchange) {
654        if (exchange.getFromRouteId() == null) {
655            return exchange.getContext().getStreamCachingStrategy().isEnabled();
656        } else {
657            return exchange.getContext().getRoute(exchange.getFromRouteId()).getRouteContext().isStreamCaching();
658        }
659    }
660
661    /**
662     * Extracts the body from the given exchange.
663     * <p/>
664     * If the exchange pattern is provided it will try to honor it and retrieve the body
665     * from either IN or OUT according to the pattern.
666     *
667     * @param exchange the exchange
668     * @param pattern  exchange pattern if given, can be <tt>null</tt>
669     * @return the result body, can be <tt>null</tt>.
670     * @throws CamelExecutionException is thrown if the processing of the exchange failed
671     */
672    public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) {
673        Object answer = null;
674        if (exchange != null) {
675            // rethrow if there was an exception during execution
676            if (exchange.getException() != null) {
677                throw ObjectHelper.wrapCamelExecutionException(exchange, exchange.getException());
678            }
679
680            // result could have a fault message
681            if (hasFaultMessage(exchange)) {
682                Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
683                answer = msg.getBody();
684                return answer;
685            }
686
687            // okay no fault then return the response according to the pattern
688            // try to honor pattern if provided
689            boolean notOut = pattern != null && !pattern.isOutCapable();
690            boolean hasOut = exchange.hasOut();
691            if (hasOut && !notOut) {
692                // we have a response in out and the pattern is out capable
693                answer = exchange.getOut().getBody();
694            } else if (!hasOut && exchange.getPattern() == ExchangePattern.InOptionalOut) {
695                // special case where the result is InOptionalOut and with no OUT response
696                // so we should return null to indicate this fact
697                answer = null;
698            } else {
699                // use IN as the response
700                answer = exchange.getIn().getBody();
701            }
702        }
703        return answer;
704    }
705
706    /**
707     * Tests whether the exchange has a fault message set and that its not null.
708     *
709     * @param exchange the exchange
710     * @return <tt>true</tt> if fault message exists
711     */
712    public static boolean hasFaultMessage(Exchange exchange) {
713        Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
714        return msg.isFault() && msg.getBody() != null;
715    }
716
717    /**
718     * Tests whether the exchange has already been handled by the error handler
719     *
720     * @param exchange the exchange
721     * @return <tt>true</tt> if handled already by error handler, <tt>false</tt> otherwise
722     */
723    public static boolean hasExceptionBeenHandledByErrorHandler(Exchange exchange) {
724        return Boolean.TRUE.equals(exchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
725    }
726
727    /**
728     * Extracts the body from the given future, that represents a handle to an asynchronous exchange.
729     * <p/>
730     * Will wait until the future task is complete.
731     *
732     * @param context the camel context
733     * @param future  the future handle
734     * @param type    the expected body response type
735     * @return the result body, can be <tt>null</tt>.
736     * @throws CamelExecutionException is thrown if the processing of the exchange failed
737     */
738    public static <T> T extractFutureBody(CamelContext context, Future<?> future, Class<T> type) {
739        try {
740            return doExtractFutureBody(context, future.get(), type);
741        } catch (InterruptedException e) {
742            throw ObjectHelper.wrapRuntimeCamelException(e);
743        } catch (ExecutionException e) {
744            // execution failed due to an exception so rethrow the cause
745            throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
746        } finally {
747            // its harmless to cancel if task is already completed
748            // and in any case we do not want to get hold of the task a 2nd time
749            // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
750            future.cancel(true);
751        }
752    }
753
754    /**
755     * Extracts the body from the given future, that represents a handle to an asynchronous exchange.
756     * <p/>
757     * Will wait for the future task to complete, but waiting at most the timeout value.
758     *
759     * @param context the camel context
760     * @param future  the future handle
761     * @param timeout timeout value
762     * @param unit    timeout unit
763     * @param type    the expected body response type
764     * @return the result body, can be <tt>null</tt>.
765     * @throws CamelExecutionException is thrown if the processing of the exchange failed
766     * @throws java.util.concurrent.TimeoutException is thrown if a timeout triggered
767     */
768    public static <T> T extractFutureBody(CamelContext context, Future<?> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
769        try {
770            if (timeout > 0) {
771                return doExtractFutureBody(context, future.get(timeout, unit), type);
772            } else {
773                return doExtractFutureBody(context, future.get(), type);
774            }
775        } catch (InterruptedException e) {
776            // execution failed due interruption so rethrow the cause
777            throw ObjectHelper.wrapCamelExecutionException(null, e);
778        } catch (ExecutionException e) {
779            // execution failed due to an exception so rethrow the cause
780            throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
781        } finally {
782            // its harmless to cancel if task is already completed
783            // and in any case we do not want to get hold of the task a 2nd time
784            // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
785            future.cancel(true);
786        }
787    }
788
789    private static <T> T doExtractFutureBody(CamelContext context, Object result, Class<T> type) {
790        if (result == null) {
791            return null;
792        }
793        if (type.isAssignableFrom(result.getClass())) {
794            return type.cast(result);
795        }
796        if (result instanceof Exchange) {
797            Exchange exchange = (Exchange) result;
798            Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern());
799            return context.getTypeConverter().convertTo(type, exchange, answer);
800        }
801        return context.getTypeConverter().convertTo(type, result);
802    }
803
804    /**
805     * @deprecated use org.apache.camel.CamelExchangeException.createExceptionMessage instead
806     */
807    @Deprecated
808    public static String createExceptionMessage(String message, Exchange exchange, Throwable cause) {
809        return CamelExchangeException.createExceptionMessage(message, exchange, cause);
810    }
811
812    /**
813     * Strategy to prepare results before next iterator or when we are complete,
814     * which is done by copying OUT to IN, so there is only an IN as input
815     * for the next iteration.
816     *
817     * @param exchange the exchange to prepare
818     */
819    public static void prepareOutToIn(Exchange exchange) {
820        // we are routing using pipes and filters so we need to manually copy OUT to IN
821        if (exchange.hasOut()) {
822            exchange.setIn(exchange.getOut());
823            exchange.setOut(null);
824        }
825    }
826
827    /**
828     * Gets both the messageId and exchangeId to be used for logging purposes.
829     * <p/>
830     * Logging both ids, can help to correlate exchanges which may be redelivered messages
831     * from for example a JMS broker.
832     *
833     * @param exchange the exchange
834     * @return a log message with both the messageId and exchangeId
835     */
836    public static String logIds(Exchange exchange) {
837        String msgId = exchange.hasOut() ? exchange.getOut().getMessageId() : exchange.getIn().getMessageId();
838        return "(MessageId: " + msgId + " on ExchangeId: " + exchange.getExchangeId()  + ")";
839    }
840
841    /**
842     * Copies the exchange but the copy will be tied to the given context
843     *
844     * @param exchange  the source exchange
845     * @param context   the camel context
846     * @return a copy with the given camel context
847     */
848    public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context) {
849        return copyExchangeAndSetCamelContext(exchange, context, true);
850    }
851
852    /**
853     * Copies the exchange but the copy will be tied to the given context
854     *
855     * @param exchange  the source exchange
856     * @param context   the camel context
857     * @param handover  whether to handover on completions from the source to the copy
858     * @return a copy with the given camel context
859     */
860    public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context, boolean handover) {
861        DefaultExchange answer = new DefaultExchange(context, exchange.getPattern());
862        if (exchange.hasProperties()) {
863            answer.setProperties(safeCopy(exchange.getProperties()));
864        }
865        if (handover) {
866            // Need to hand over the completion for async invocation
867            exchange.handoverCompletions(answer);
868        }
869        answer.setIn(exchange.getIn().copy());
870        if (exchange.hasOut()) {
871            answer.setOut(exchange.getOut().copy());
872        }
873        answer.setException(exchange.getException());
874        return answer;
875    }
876
877    /**
878     * Replaces the existing message with the new message
879     *
880     * @param exchange  the exchange
881     * @param newMessage the new message
882     * @param outOnly    whether to replace the message as OUT message
883     */
884    public static void replaceMessage(Exchange exchange, Message newMessage, boolean outOnly) {
885        Message old = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
886        if (outOnly || exchange.hasOut()) {
887            exchange.setOut(newMessage);
888        } else {
889            exchange.setIn(newMessage);
890        }
891
892        // need to de-reference old from the exchange so it can be GC
893        if (old instanceof MessageSupport) {
894            ((MessageSupport) old).setExchange(null);
895        }
896    }
897
898    /**
899     * Gets the original IN {@link Message} this Unit of Work was started with.
900     * <p/>
901     * The original message is only returned if the option {@link org.apache.camel.RuntimeConfiguration#isAllowUseOriginalMessage()}
902     * is enabled. If its disabled, then <tt>null</tt> is returned.
903     *
904     * @return the original IN {@link Message}, or <tt>null</tt> if using original message is disabled.
905     */
906    public static Message getOriginalInMessage(Exchange exchange) {
907        Message answer = null;
908
909        // try parent first
910        UnitOfWork uow = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
911        if (uow != null) {
912            answer = uow.getOriginalInMessage();
913        }
914        // fallback to the current exchange
915        if (answer == null) {
916            uow = exchange.getUnitOfWork();
917            if (uow != null) {
918                answer = uow.getOriginalInMessage();
919            }
920        }
921        return answer;
922    }
923
924    @SuppressWarnings("unchecked")
925    private static Map<String, Object> safeCopy(Map<String, Object> properties) {
926        if (properties == null) {
927            return null;
928        }
929
930        Map<String, Object> answer = new ConcurrentHashMap<String, Object>(properties);
931
932        // safe copy message history using a defensive copy
933        List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY);
934        if (history != null) {
935            answer.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history));
936        }
937
938        return answer;
939    }
940}